[hornetq-commits] JBoss hornetq SVN: r12278 - in branches/Branch_2_2_EAP_HORNETQ-787: tests/src/org/hornetq/tests/integration/persistence and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Mar 9 13:01:23 EST 2012


Author: jbertram
Date: 2012-03-09 13:01:22 -0500 (Fri, 09 Mar 2012)
New Revision: 12278

Added:
   branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java
Modified:
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java
Log:
[HORNETQ-787] added integration testing, finished import and export

Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java	2012-03-09 17:55:33 UTC (rev 12277)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java	2012-03-09 18:01:22 UTC (rev 12278)
@@ -52,6 +52,7 @@
    public static final String QUEUE_NAME = "name";
    public static final String PROPERTY_TYPE_BOOLEAN = "boolean";
    public static final String PROPERTY_TYPE_BYTE = "byte";
+   public static final String PROPERTY_TYPE_BYTES = "bytes";
    public static final String PROPERTY_TYPE_SHORT = "short";
    public static final String PROPERTY_TYPE_INTEGER = "integer";
    public static final String PROPERTY_TYPE_LONG = "long";
@@ -59,5 +60,5 @@
    public static final String PROPERTY_TYPE_DOUBLE = "double";
    public static final String PROPERTY_TYPE_STRING = "string";
    public static final String PROPERTY_TYPE_SIMPLE_STRING = "simple-string";
-   public static final int CHUNK = 1000;
+   public static final Long CHUNK = 1000L;
 }
\ No newline at end of file

Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java	2012-03-09 17:55:33 UTC (rev 12277)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java	2012-03-09 18:01:22 UTC (rev 12278)
@@ -17,21 +17,37 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientRequestor;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
 import org.hornetq.core.remoting.impl.netty.TransportConstants;
 import org.hornetq.utils.Base64;
 
 import javax.xml.stream.XMLInputFactory;
 import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamReader;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 
 /**
  * @author <a href="mailto:jbertram at redhat.com">Justin Bertram</a>
@@ -48,21 +64,41 @@
 
    ClientSession session;
 
+   boolean localSession = false;
+
+   Map<String, String> addressMap = new HashMap<String, String>();
+   
+   String tempFileName = "";
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public XmlDataReader(String inputFile, String host, String port)
+   public XmlDataReader(InputStream inputStream, ClientSession session)
    {
       try
       {
-         reader = XMLInputFactory.newInstance().createXMLStreamReader(new java.io.FileInputStream(inputFile));
+         reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
+         this.session = session;
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   public XmlDataReader(InputStream inputStream, String host, String port)
+   {
+      try
+      {
+         reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
          HashMap<String, Object> connectionParams = new HashMap<String, Object>();
          connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
          connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
          ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
          ClientSessionFactory sf = serverLocator.createSessionFactory();
          session = sf.createSession(false, true, true);
+         localSession = true;
       }
       catch (Exception e)
       {
@@ -70,6 +106,11 @@
       }
    }
 
+   public XmlDataReader(String inputFile, String host, String port) throws FileNotFoundException
+   {
+      this(new FileInputStream(inputFile), host, port);
+   }
+
    // Public --------------------------------------------------------
 
    public static void main(String arg[])
@@ -83,7 +124,7 @@
       try
       {
          XmlDataReader xmlDataReader = new XmlDataReader(arg[0], arg[1], arg[2]);
-         xmlDataReader.readXMLData();
+         xmlDataReader.processXml();
       }
       catch (Exception e)
       {
@@ -91,36 +132,42 @@
       }
    }
 
-   private void readXMLData() throws Exception
+   public void processXml() throws Exception
    {
-      while (reader.hasNext())
+      try
       {
-         log.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
-         if (reader.getEventType() == XMLStreamConstants.START_ELEMENT)
+         while (reader.hasNext())
          {
-            if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName()))
+            log.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
+            if (reader.getEventType() == XMLStreamConstants.START_ELEMENT)
             {
-               bindQueue();
+               if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName()))
+               {
+                  bindQueue();
+               }
+               if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
+               {
+                  processMessage();
+               }
             }
-            if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
-            {
-               sendMessage();
-            }
+            reader.next();
          }
-         reader.next();
+      } finally
+      {
+         if (localSession)
+         {
+            session.close();
+         }
       }
-
-      session.close();
    }
 
-   private void sendMessage() throws Exception
+   private void processMessage() throws Exception
    {
-      byte type = 0;
-      boolean isLarge = false;
-      byte priority = 0;
-      long expiration = 0;
-      long timestamp = 0;
-//      HashMap<String, Object> properties = new HashMap<String, Object>();
+      Byte type = 0;
+      Byte priority = 0;
+      Long expiration = 0L;
+      Long timestamp = 0L;
+      org.hornetq.utils.UUID userId = null;
       ArrayList<String> queues = new ArrayList<String>();
 
       for (int i = 0; i < reader.getAttributeCount(); i++)
@@ -128,31 +175,8 @@
          String attributeName = reader.getAttributeLocalName(i);
          if (XmlDataConstants.MESSAGE_TYPE.equals(attributeName))
          {
-            String value = reader.getAttributeValue(i);
-            if (value.equals(XmlDataConstants.DEFAULT_TYPE_PRETTY))
-            {
-               type = Message.DEFAULT_TYPE;
-            } else if (value.equals(XmlDataConstants.BYTES_TYPE_PRETTY))
-            {
-               type = Message.BYTES_TYPE;
-            } else if (value.equals(XmlDataConstants.MAP_TYPE_PRETTY))
-            {
-               type = Message.MAP_TYPE;
-            } else if (value.equals(XmlDataConstants.OBJECT_TYPE_PRETTY))
-            {
-               type = Message.OBJECT_TYPE;
-            } else if (value.equals(XmlDataConstants.STREAM_TYPE_PRETTY))
-            {
-               type = Message.STREAM_TYPE;
-            } else if (value.equals(XmlDataConstants.TEXT_TYPE_PRETTY))
-            {
-               type = Message.TEXT_TYPE;
-            }
+            type = getMessageType(reader.getAttributeValue(i));
          }
-         else if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName))
-         {
-            isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
-         }
          else if (XmlDataConstants.MESSAGE_PRIORITY.equals(attributeName))
          {
             priority = Byte.parseByte(reader.getAttributeValue(i));
@@ -165,13 +189,16 @@
          {
             timestamp = Long.parseLong(reader.getAttributeValue(i));
          }
+         else if (XmlDataConstants.MESSAGE_USER_ID.equals(attributeName))
+         {
+            userId = org.hornetq.utils.UUIDGenerator.getInstance().generateUUID();
+         }
       }
 
       Message message = session.createMessage(type, true, expiration, timestamp, priority);
+      message.setUserID(userId);
 
       boolean endLoop = false;
-//      StringBuilder propertiesString = new StringBuilder();
-//      StringBuilder queuesString = new StringBuilder();
 
       while (reader.hasNext())
       {
@@ -180,85 +207,15 @@
             case XMLStreamConstants.START_ELEMENT:
                if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName()))
                {
-                  reader.next();
-                  int start = reader.getTextStart();
-                  int length = reader.getTextLength();
-                  message.getBodyBuffer().writeBytes(decode(new String(reader.getTextCharacters(), start, length).trim()));
+                  processMessageBody(message);
                }
                else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName()))
                {
-                  String key = "";
-                  String value = "";
-                  String propertyType = "";
-
-                  for (int i = 0; i < reader.getAttributeCount(); i++)
-                  {
-                     String attributeName = reader.getAttributeLocalName(i);
-                     if (XmlDataConstants.PROPERTY_NAME.equals(attributeName))
-                     {
-//                        propertiesString.append("(" + XmlDataConstants.PROPERTY_NAME + "=" + reader.getAttributeValue(i));
-                        key = reader.getAttributeValue(i);
-                     }
-                     else if (XmlDataConstants.PROPERTY_VALUE.equals(attributeName))
-                     {
-//                        propertiesString.append("," + XmlDataConstants.PROPERTY_VALUE + "=" + reader.getAttributeValue(i));
-                        value = reader.getAttributeValue(i);
-                     }
-                     else if (XmlDataConstants.PROPERTY_TYPE.equals(attributeName))
-                     {
-//                        propertiesString.append(", " + XmlDataConstants.PROPERTY_TYPE + "=" + reader.getAttributeValue(i) + ") ");
-                        propertyType = reader.getAttributeValue(i);
-                     }
-                  }
-
-                  if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SHORT))
-                  {
-                     message.putShortProperty(key, Short.parseShort(value));
-                  }
-                  else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BOOLEAN))
-                  {
-                     message.putBooleanProperty(key, Boolean.parseBoolean(value));
-                  }
-                  else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTE))
-                  {
-                     message.putByteProperty(key, Byte.parseByte(value));
-                  }
-                  else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_DOUBLE))
-                  {
-                     message.putDoubleProperty(key, Double.parseDouble(value));
-                  }
-                  else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_FLOAT))
-                  {
-                     message.putFloatProperty(key, Float.parseFloat(value));
-                  }
-                  else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_INTEGER))
-                  {
-                     message.putIntProperty(key, Integer.parseInt(value));
-                  }
-                  else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_LONG))
-                  {
-                     message.putLongProperty(key, Long.parseLong(value));
-                  }
-                  else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING))
-                  {
-                     message.putStringProperty(new SimpleString(key), new SimpleString(value));
-                  }
-                  else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_STRING))
-                  {
-                     message.putStringProperty(key, value);
-                  }
+                  processMessageProperties(message);
                }
                else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName()))
                {
-                  for (int i = 0; i < reader.getAttributeCount(); i++)
-                  {
-                     String attributeName = reader.getAttributeLocalName(i);
-                     if (XmlDataConstants.QUEUE_NAME.equals(attributeName))
-                     {
-//                        queuesString.append("(" + XmlDataConstants.QUEUE_NAME + "=" + reader.getAttributeValue(i) + ")");
-                        queues.add(reader.getAttributeValue(i));
-                     }
-                  }
+                  processMessageQueues(queues);
                }
                break;
             case XMLStreamConstants.END_ELEMENT:
@@ -275,17 +232,191 @@
          reader.next();
       }
 
-      for(String queueName : queues)
+      sendMessage(queues, message);
+   }
+
+   private Byte getMessageType(String value)
+   {
+      Byte type = Message.DEFAULT_TYPE;
+      if (value.equals(XmlDataConstants.DEFAULT_TYPE_PRETTY))
       {
-         System.out.println("To " + queueName + ": " + message);
-         ClientProducer producer = session.createProducer(queueName);
-         producer.send(message);
+         type = Message.DEFAULT_TYPE;
       }
+      else if (value.equals(XmlDataConstants.BYTES_TYPE_PRETTY))
+      {
+         type = Message.BYTES_TYPE;
+      }
+      else if (value.equals(XmlDataConstants.MAP_TYPE_PRETTY))
+      {
+         type = Message.MAP_TYPE;
+      }
+      else if (value.equals(XmlDataConstants.OBJECT_TYPE_PRETTY))
+      {
+         type = Message.OBJECT_TYPE;
+      }
+      else if (value.equals(XmlDataConstants.STREAM_TYPE_PRETTY))
+      {
+         type = Message.STREAM_TYPE;
+      }
+      else if (value.equals(XmlDataConstants.TEXT_TYPE_PRETTY))
+      {
+         type = Message.TEXT_TYPE;
+      }
+      return type;
+   }
 
-//      System.out.println("Sending message (type=" + type + ", isLarge=" + isLarge + ", priority=" + priority + ", expiration=" + expiration + ", timestamp=" + timestamp + ", properties=" + properties + ", queues=" + queues + ", body=" + new String(decode(body.trim())) + ")");
+   private void sendMessage(ArrayList<String> queues, Message message) throws Exception
+   {
+//      System.out.print("To " + addressMap.get(queues.get(0)) + ": " + message + " (routed to: ");
+      ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
+      for (String queue : queues)
+      {
+         ClientRequestor requestor = new ClientRequestor(session, "jms.queue.hornetq.management");
+         ClientMessage managementMessage = session.createMessage(false);
+         ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
+         session.start();
+         ClientMessage reply = requestor.request(managementMessage);
+         long queueID = (Integer) ManagementHelper.getResult(reply);
+//         System.out.print(queue + ", ");
+         session.queueQuery(new SimpleString(queue));
+         buffer.putLong(queueID);
+      }
+//      System.out.println(")");
+      message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+      ClientProducer producer = session.createProducer(addressMap.get(queues.get(0)));
+      producer.send(message);
+      producer.close();
 
+      File tempFile = new File(tempFileName);
+      if (!tempFile.delete())
+      {
+         System.err.println("Couldn't delete " + tempFileName);
+      }
    }
 
+   private void processMessageQueues(ArrayList<String> queues)
+   {
+      for (int i = 0; i < reader.getAttributeCount(); i++)
+      {
+         if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i)))
+         {
+            queues.add(reader.getAttributeValue(i));
+         }
+      }
+   }
+
+   private void processMessageProperties(Message message)
+   {
+      String key = "";
+      String value = "";
+      String propertyType = "";
+
+      for (int i = 0; i < reader.getAttributeCount(); i++)
+      {
+         String attributeName = reader.getAttributeLocalName(i);
+         if (XmlDataConstants.PROPERTY_NAME.equals(attributeName))
+         {
+            key = reader.getAttributeValue(i);
+         }
+         else if (XmlDataConstants.PROPERTY_VALUE.equals(attributeName))
+         {
+            value = reader.getAttributeValue(i);
+         }
+         else if (XmlDataConstants.PROPERTY_TYPE.equals(attributeName))
+         {
+            propertyType = reader.getAttributeValue(i);
+         }
+      }
+
+      if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SHORT))
+      {
+         message.putShortProperty(key, Short.parseShort(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BOOLEAN))
+      {
+         message.putBooleanProperty(key, Boolean.parseBoolean(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTE))
+      {
+         message.putByteProperty(key, Byte.parseByte(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTES))
+      {
+         message.putBytesProperty(key, decode(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_DOUBLE))
+      {
+         message.putDoubleProperty(key, Double.parseDouble(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_FLOAT))
+      {
+         message.putFloatProperty(key, Float.parseFloat(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_INTEGER))
+      {
+         message.putIntProperty(key, Integer.parseInt(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_LONG))
+      {
+         message.putLongProperty(key, Long.parseLong(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING))
+      {
+         message.putStringProperty(new SimpleString(key), new SimpleString(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_STRING))
+      {
+         message.putStringProperty(key, value);
+      }
+   }
+
+   private void processMessageBody(Message message) throws XMLStreamException, IOException
+   {
+      boolean isLarge = false;
+
+      if (reader.getAttributeCount() > 0)
+      {
+         isLarge = Boolean.parseBoolean(reader.getAttributeValue(0));
+      }
+      reader.next();
+      if (isLarge)
+      {
+         tempFileName = UUID.randomUUID().toString() + ".tmp";
+         OutputStream out = new FileOutputStream(tempFileName);
+         while (reader.hasNext())
+         {
+            if (reader.getEventType() == XMLStreamConstants.END_ELEMENT)
+            {
+               break;
+            }
+            else
+            {
+//               System.out.println("Reading " + reader.getTextLength() + " characters.");
+               String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
+               String trimmedCharacters = characters.trim();
+               if (trimmedCharacters.length() > 0)
+               {
+                  byte[] data = decode(trimmedCharacters);
+//                  System.out.println(new String(data));
+//                  System.out.println("Writing " + data.length + " bytes.");
+                  out.write(data);
+               }
+            }
+            reader.next();
+         }
+         out.close();
+         FileInputStream fileInputStream = new FileInputStream(tempFileName);
+         BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
+         ((ClientMessage) message).setBodyInputStream(bufferedInput);
+      }
+      else
+      {
+         reader.next(); // step past the "indentation" characters to get to the CDATA
+         String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
+         message.getBodyBuffer().writeBytes(decode(characters.trim()));
+      }
+   }
+
    private void bindQueue() throws Exception
    {
       String queueName = "";
@@ -309,9 +440,16 @@
          }
       }
 
-      session.createQueue(address, queueName, filter, true);
+      ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
 
-      System.out.println("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
+      if (!queueQuery.isExists())
+      {
+         session.createQueue(address, queueName, filter, true);
+      }
+
+      addressMap.put(queueName, address);
+
+//      System.out.println("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
    }
 
    // Package protected ---------------------------------------------
@@ -322,7 +460,7 @@
 
    private static byte[] decode(String data)
    {
-      return Base64.decode(data);
+      return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
    }
 
    // Inner classes -------------------------------------------------

Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java	2012-03-09 17:55:33 UTC (rev 12277)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java	2012-03-09 18:01:22 UTC (rev 12278)
@@ -64,12 +64,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.PersistentQueueBindingEncoding;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.MessageDescribe;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
+import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.*;
 
 /**
  * @author <a href="mailto:jbertram at redhat.com">Justin Bertram</a>
@@ -118,7 +113,7 @@
             return executor;
          }
       };
-      
+
       storageManager = new JournalStorageManager(config, executorFactory);
 
       messageRefs = new HashMap<Long, HashMap<Long, ReferenceDescribe>>();
@@ -140,7 +135,8 @@
                XMLStreamWriter.class.getClassLoader(),
                new Class[]{XMLStreamWriter.class},
                handler);
-      } catch (Exception e)
+      }
+      catch (Exception e)
       {
          e.printStackTrace();
       }
@@ -157,7 +153,7 @@
       }
 
       try
-      {     
+      {
 //         long start = System.currentTimeMillis();
          XmlDataWriter xmlDataWriter = new XmlDataWriter(System.out, arg[0], arg[1], arg[2], arg[3]);
          xmlDataWriter.writeXMLData();
@@ -194,7 +190,7 @@
 
       messageJournal.start();
 
-      ((JournalImpl)messageJournal).load(records, null, null, false);
+      ((JournalImpl) messageJournal).load(records, null, null, false);
 
       for (RecordInfo info : records)
       {
@@ -294,7 +290,7 @@
 
       bindingsJournal.start();
 
-      ((JournalImpl)bindingsJournal).load(records, null, null, false);
+      ((JournalImpl) bindingsJournal).load(records, null, null, false);
 
       for (RecordInfo info : records)
       {
@@ -456,26 +452,55 @@
       {
          xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
          LargeServerMessage largeMessage = (LargeServerMessage) message;
+         BodyEncoder encoder = null;
 
          try
          {
-            BodyEncoder encoder = largeMessage.getBodyEncoder();
+            encoder = largeMessage.getBodyEncoder();
             encoder.open();
-            for (long i = 0; i < encoder.getLargeBodySize(); i += XmlDataConstants.CHUNK)
+            long totalBytesWritten = 0;
+            Long bufferSize;
+            long bodySize = encoder.getLargeBodySize();
+            for (long i = 0; i < bodySize; i += XmlDataConstants.CHUNK)
             {
-               HornetQBuffer buffer = HornetQBuffers.fixedBuffer(XmlDataConstants.CHUNK);
-               encoder.encode(buffer, XmlDataConstants.CHUNK);
-               xmlWriter.writeCharacters(encode(buffer.toByteBuffer().array()));
+               Long remainder = bodySize - totalBytesWritten;
+               if (remainder >= XmlDataConstants.CHUNK)
+               {
+                  bufferSize = XmlDataConstants.CHUNK;
+               }
+               else
+               {
+                  bufferSize = remainder;
+               }
+               HornetQBuffer buffer = HornetQBuffers.fixedBuffer(bufferSize.intValue());
+               encoder.encode(buffer, bufferSize.intValue());
+               xmlWriter.writeCData(encode(buffer.toByteBuffer().array()));
+               totalBytesWritten += bufferSize;
             }
+            encoder.close();
          }
          catch (HornetQException e)
          {
             e.printStackTrace();
          }
+         finally
+         {
+            if (encoder != null)
+            {
+               try
+               {
+                  encoder.close();
+               }
+               catch (HornetQException e)
+               {
+                  e.printStackTrace();
+               }
+            }
+         }
       }
       else
       {
-         xmlWriter.writeCharacters(encode(message.getBodyBuffer().toByteBuffer().array()));
+         xmlWriter.writeCData(encode(message.getBodyBuffer().toByteBuffer().array()));
       }
       xmlWriter.writeEndElement(); // end MESSAGE_BODY
    }
@@ -499,7 +524,15 @@
          Object value = message.getObjectProperty(key);
          xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
          xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
-         xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value.toString());
+         if (value instanceof byte[])
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, encode((byte[]) value));
+         }
+         else
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value.toString());
+         }
+
          if (value instanceof Boolean)
          {
             xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BOOLEAN);
@@ -536,6 +569,10 @@
          {
             xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING);
          }
+         else if (value instanceof byte[])
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTES);
+         }
       }
       xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
    }
@@ -626,7 +663,7 @@
             target.writeCharacters(LINE_SEPARATOR);
             target.writeCharacters(indent(depth));
          }
-         else if ("writeEmptyElement".equals(m) || "writeCharacters".equals(m))
+         else if ("writeEmptyElement".equals(m) || "writeCData".equals(m))
          {
             target.writeCharacters(LINE_SEPARATOR);
             target.writeCharacters(indent(depth));

Added: branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java	                        (rev 0)
+++ branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java	2012-03-09 18:01:22 UTC (rev 12278)
@@ -0,0 +1,468 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import junit.framework.Assert;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
+import org.hornetq.core.persistence.impl.journal.XmlDataReader;
+import org.hornetq.core.persistence.impl.journal.XmlDataWriter;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.UUIDGenerator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * A ExportFormatTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public class XmlImportExportTest extends ServiceTestBase
+{
+   public static final int CONSUMER_TIMEOUT = 5000;
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   protected void tearDown() throws Exception
+   {
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   public void testMessageProperties() throws Exception
+   {
+      final String QUEUE_NAME = "A1";
+      HornetQServer server = createServer(true);
+      server.start();
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession(false, true, true);
+
+      session.createQueue(QUEUE_NAME, QUEUE_NAME);
+
+      ClientProducer producer = session.createProducer(QUEUE_NAME);
+
+      StringBuilder international = new StringBuilder();
+      for (char x = 800; x < 1200; x++)
+      {
+         international.append(x);
+      }
+
+      String special = "\"<>'&";
+
+      for (int i = 0; i < 5; i++)
+      {
+         ClientMessage msg = session.createMessage(true);
+         msg.getBodyBuffer().writeString("Bob the giant pig " + i);
+         msg.putBooleanProperty("myBooleanProperty", Boolean.TRUE);
+         msg.putByteProperty("myByteProperty", new Byte("0"));
+         msg.putBytesProperty("myBytesProperty", new byte[]{0, 1, 2, 3, 4});
+         msg.putDoubleProperty("myDoubleProperty", i * 1.6);
+         msg.putFloatProperty("myFloatProperty", i * 2.5F);
+         msg.putIntProperty("myIntProperty", i);
+         msg.putLongProperty("myLongProperty", Long.MAX_VALUE - i);
+         msg.putObjectProperty("myObjectProperty", i);
+         msg.putShortProperty("myShortProperty", new Integer(i).shortValue());
+         msg.putStringProperty("myStringProperty", "myStringPropertyValue_" + i);
+         msg.putStringProperty("myNonAsciiStringProperty", international.toString());
+         msg.putStringProperty("mySpecialCharacters", special);
+         producer.send(msg);
+      }
+
+      session.close();
+      locator.close();
+      server.stop();
+
+      ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+      XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataWriter.writeXMLData();
+      System.out.print(new String(xmlOutputStream.toByteArray()));
+
+      clearData();
+      server.start();
+      locator = createInVMNonHALocator();
+      factory = locator.createSessionFactory();
+      session = factory.createSession(false, true, true);
+
+      ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+      XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+      xmlDataReader.processXml();
+      ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+      session.start();
+
+      for (int i = 0; i < 5; i++)
+      {
+         ClientMessage msg = consumer.receive(CONSUMER_TIMEOUT);
+         byte[] body = new byte[msg.getBodySize()];
+         msg.getBodyBuffer().readBytes(body);
+         Assert.assertTrue(new String(body).contains("Bob the giant pig " + i));
+         Assert.assertEquals(msg.getBooleanProperty("myBooleanProperty"), Boolean.TRUE);
+         Assert.assertEquals(msg.getByteProperty("myByteProperty"), new Byte("0"));
+         byte[] bytes = msg.getBytesProperty("myBytesProperty");
+         for (int j = 0; j < 5; j++)
+         {
+            Assert.assertEquals(j, bytes[j]);
+         }
+         Assert.assertEquals(i * 1.6, msg.getDoubleProperty("myDoubleProperty"));
+         Assert.assertEquals(i * 2.5F, msg.getFloatProperty("myFloatProperty"));
+         Assert.assertEquals(i, msg.getIntProperty("myIntProperty").intValue());
+         Assert.assertEquals(Long.MAX_VALUE - i, msg.getLongProperty("myLongProperty").longValue());
+         Assert.assertEquals(i, msg.getObjectProperty("myObjectProperty"));
+         Assert.assertEquals(new Integer(i).shortValue(), msg.getShortProperty("myShortProperty").shortValue());
+         Assert.assertEquals("myStringPropertyValue_" + i, msg.getStringProperty("myStringProperty"));
+         Assert.assertEquals(international.toString(), msg.getStringProperty("myNonAsciiStringProperty"));
+         Assert.assertEquals(special, msg.getStringProperty("mySpecialCharacters"));
+      }
+
+      session.close();
+      locator.close();
+      server.stop();
+   }
+
+   public void testMessageTypes() throws Exception
+   {
+      final String QUEUE_NAME = "A1";
+      HornetQServer server = createServer(true);
+      server.start();
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession(false, true, true);
+
+      session.createQueue(QUEUE_NAME, QUEUE_NAME);
+
+      ClientProducer producer = session.createProducer(QUEUE_NAME);
+
+
+      ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);
+      producer.send(msg);
+      msg = session.createMessage(Message.DEFAULT_TYPE, true);
+      producer.send(msg);
+      msg = session.createMessage(Message.MAP_TYPE, true);
+      producer.send(msg);
+      msg = session.createMessage(Message.OBJECT_TYPE, true);
+      producer.send(msg);
+      msg = session.createMessage(Message.STREAM_TYPE, true);
+      producer.send(msg);
+      msg = session.createMessage(Message.TEXT_TYPE, true);
+      producer.send(msg);
+      msg = session.createMessage(true);
+      producer.send(msg);
+
+      session.close();
+      locator.close();
+      server.stop();
+
+      ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+      XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataWriter.writeXMLData();
+      System.out.print(new String(xmlOutputStream.toByteArray()));
+
+      clearData();
+      server.start();
+      locator = createInVMNonHALocator();
+      factory = locator.createSessionFactory();
+      session = factory.createSession(false, true, true);
+
+      ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+      XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+      xmlDataReader.processXml();
+      ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+      session.start();
+
+      msg = consumer.receive(CONSUMER_TIMEOUT);
+      Assert.assertEquals(Message.BYTES_TYPE, msg.getType());
+      msg = consumer.receive(CONSUMER_TIMEOUT);
+      Assert.assertEquals(Message.DEFAULT_TYPE, msg.getType());
+      msg = consumer.receive(CONSUMER_TIMEOUT);
+      Assert.assertEquals(Message.MAP_TYPE, msg.getType());
+      msg = consumer.receive(CONSUMER_TIMEOUT);
+      Assert.assertEquals(Message.OBJECT_TYPE, msg.getType());
+      msg = consumer.receive(CONSUMER_TIMEOUT);
+      Assert.assertEquals(Message.STREAM_TYPE, msg.getType());
+      msg = consumer.receive(CONSUMER_TIMEOUT);
+      Assert.assertEquals(Message.TEXT_TYPE, msg.getType());
+      msg = consumer.receive(CONSUMER_TIMEOUT);
+      Assert.assertEquals(Message.DEFAULT_TYPE, msg.getType());
+
+      session.close();
+      locator.close();
+      server.stop();
+   }
+
+   public void testMessageAttributes() throws Exception
+   {
+      final String QUEUE_NAME = "A1";
+      HornetQServer server = createServer(true);
+      server.start();
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession(false, true, true);
+
+      session.createQueue(QUEUE_NAME, QUEUE_NAME);
+
+      ClientProducer producer = session.createProducer(QUEUE_NAME);
+
+      ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);
+      msg.setExpiration(Long.MAX_VALUE);
+      msg.setPriority((byte) 0);
+      msg.setTimestamp(Long.MAX_VALUE - 1);
+      msg.setUserID(UUIDGenerator.getInstance().generateUUID());
+      producer.send(msg);
+
+      session.close();
+      locator.close();
+      server.stop();
+
+      ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+      XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataWriter.writeXMLData();
+      System.out.print(new String(xmlOutputStream.toByteArray()));
+
+      clearData();
+      server.start();
+      locator = createInVMNonHALocator();
+      factory = locator.createSessionFactory();
+      session = factory.createSession(false, true, true);
+
+      ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+      XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+      xmlDataReader.processXml();
+      ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+      session.start();
+
+      msg = consumer.receive(CONSUMER_TIMEOUT);
+      Assert.assertEquals(Long.MAX_VALUE, msg.getExpiration());
+      Assert.assertEquals((byte) 0, msg.getPriority());
+      Assert.assertEquals(Long.MAX_VALUE - 1, msg.getTimestamp());
+      Assert.assertNotNull(msg.getUserID());
+
+      session.close();
+      locator.close();
+      server.stop();
+   }
+
+   public void testBindingAttributes() throws Exception
+   {
+      HornetQServer server = createServer(true);
+      server.start();
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession(false, true, true);
+
+      session.createQueue("addressName1", "queueName1");
+      session.createQueue("addressName1", "queueName2", "bob", true);
+
+      session.close();
+      locator.close();
+      server.stop();
+
+      ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+      XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataWriter.writeXMLData();
+      System.out.print(new String(xmlOutputStream.toByteArray()));
+
+      clearData();
+      server.start();
+      locator = createInVMNonHALocator();
+      factory = locator.createSessionFactory();
+      session = factory.createSession(false, true, true);
+
+      ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+      XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+      xmlDataReader.processXml();
+
+      ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString("queueName1"));
+
+      Assert.assertEquals("addressName1", queueQuery.getAddress().toString());
+      Assert.assertNull(queueQuery.getFilterString());
+
+      queueQuery = session.queueQuery(new SimpleString("queueName2"));
+
+      Assert.assertEquals("addressName1", queueQuery.getAddress().toString());
+      Assert.assertEquals("bob", queueQuery.getFilterString().toString());
+      Assert.assertEquals(Boolean.TRUE.booleanValue(), queueQuery.isDurable());
+
+      session.close();
+      locator.close();
+      server.stop();
+   }
+
+   public void testLargeMessage() throws Exception
+   {
+      HornetQServer server = createServer(true);
+      server.start();
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession(false, false);
+
+      try
+      {
+         LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
+
+         fileMessage.setMessageID(1005);
+         fileMessage.setDurable(true);
+
+         for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+         {
+            fileMessage.addBytes(new byte[] { UnitTestCase.getSamplebyte(i) });
+         }
+
+         fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+//         fileMessage.releaseResources();
+
+         session.createQueue("A", "A");
+
+         ClientProducer prod = session.createProducer("A");
+
+         prod.send(fileMessage);
+
+         fileMessage.deleteFile();
+
+         session.commit();
+
+         session.close();
+         locator.close();
+         server.stop();
+
+         ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+         XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+         xmlDataWriter.writeXMLData();
+         System.out.print(new String(xmlOutputStream.toByteArray()));
+
+         clearData();
+         server.start();
+         locator = createFactory(false);
+         factory = locator.createSessionFactory();
+         session = factory.createSession(false, true, true);
+
+         ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+         XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+         xmlDataReader.processXml();
+         session.close();
+         session = factory.createSession(false, false);
+         session.start();
+
+         ClientConsumer cons = session.createConsumer("A");
+
+         ClientMessage msg = cons.receive(CONSUMER_TIMEOUT);
+
+         Assert.assertNotNull(msg);
+
+         Assert.assertEquals(2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize());
+
+         for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+         {
+            Assert.assertEquals(UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
+         }
+
+         msg.acknowledge();
+         session.commit();
+      }
+      finally
+      {
+         session.close();
+         factory.close();
+         locator.close();
+         server.stop();
+      }
+   }
+
+   public void testPartialQueue() throws Exception
+   {
+      HornetQServer server = createServer(true);
+      server.start();
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession(false, true, true);
+
+      session.createQueue("myAddress", "myQueue1");
+      session.createQueue("myAddress", "myQueue2");
+
+      ClientProducer producer = session.createProducer("myAddress");
+
+      ClientMessage msg = session.createMessage(true);
+      producer.send(msg);
+      
+      ClientConsumer consumer = session.createConsumer("myQueue1");
+      session.start();
+      msg = consumer.receive(CONSUMER_TIMEOUT);
+      Assert.assertNotNull(msg);
+      msg.acknowledge();
+      consumer.close();
+
+      session.close();
+      locator.close();
+      server.stop();
+
+      ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+      XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataWriter.writeXMLData();
+      System.out.print(new String(xmlOutputStream.toByteArray()));
+
+      clearData();
+      server.start();
+      locator = createInVMNonHALocator();
+      factory = locator.createSessionFactory();
+      session = factory.createSession(false, true, true);
+
+      ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+      XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+      xmlDataReader.processXml();
+      consumer = session.createConsumer("myQueue1");
+      session.start();
+      msg = consumer.receive(CONSUMER_TIMEOUT);
+      Assert.assertNull(msg);
+      consumer.close();
+
+      consumer = session.createConsumer("myQueue2");
+      msg = consumer.receive(CONSUMER_TIMEOUT);
+      Assert.assertNotNull(msg);
+
+      session.close();
+      locator.close();
+      server.stop();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}



More information about the hornetq-commits mailing list