[hornetq-commits] JBoss hornetq SVN: r12278 - in branches/Branch_2_2_EAP_HORNETQ-787: tests/src/org/hornetq/tests/integration/persistence and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Mar 9 13:01:23 EST 2012
Author: jbertram
Date: 2012-03-09 13:01:22 -0500 (Fri, 09 Mar 2012)
New Revision: 12278
Added:
branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java
Modified:
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java
Log:
[HORNETQ-787] added integration testing, finished import and export
Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java 2012-03-09 17:55:33 UTC (rev 12277)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java 2012-03-09 18:01:22 UTC (rev 12278)
@@ -52,6 +52,7 @@
public static final String QUEUE_NAME = "name";
public static final String PROPERTY_TYPE_BOOLEAN = "boolean";
public static final String PROPERTY_TYPE_BYTE = "byte";
+ public static final String PROPERTY_TYPE_BYTES = "bytes";
public static final String PROPERTY_TYPE_SHORT = "short";
public static final String PROPERTY_TYPE_INTEGER = "integer";
public static final String PROPERTY_TYPE_LONG = "long";
@@ -59,5 +60,5 @@
public static final String PROPERTY_TYPE_DOUBLE = "double";
public static final String PROPERTY_TYPE_STRING = "string";
public static final String PROPERTY_TYPE_SIMPLE_STRING = "simple-string";
- public static final int CHUNK = 1000;
+ public static final Long CHUNK = 1000L;
}
\ No newline at end of file
Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java 2012-03-09 17:55:33 UTC (rev 12277)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java 2012-03-09 18:01:22 UTC (rev 12278)
@@ -17,21 +17,37 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientRequestor;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.utils.Base64;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
/**
* @author <a href="mailto:jbertram at redhat.com">Justin Bertram</a>
@@ -48,21 +64,41 @@
ClientSession session;
+ boolean localSession = false;
+
+ Map<String, String> addressMap = new HashMap<String, String>();
+
+ String tempFileName = "";
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public XmlDataReader(String inputFile, String host, String port)
+ public XmlDataReader(InputStream inputStream, ClientSession session)
{
try
{
- reader = XMLInputFactory.newInstance().createXMLStreamReader(new java.io.FileInputStream(inputFile));
+ reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
+ this.session = session;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public XmlDataReader(InputStream inputStream, String host, String port)
+ {
+ try
+ {
+ reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
HashMap<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
ClientSessionFactory sf = serverLocator.createSessionFactory();
session = sf.createSession(false, true, true);
+ localSession = true;
}
catch (Exception e)
{
@@ -70,6 +106,11 @@
}
}
+ public XmlDataReader(String inputFile, String host, String port) throws FileNotFoundException
+ {
+ this(new FileInputStream(inputFile), host, port);
+ }
+
// Public --------------------------------------------------------
public static void main(String arg[])
@@ -83,7 +124,7 @@
try
{
XmlDataReader xmlDataReader = new XmlDataReader(arg[0], arg[1], arg[2]);
- xmlDataReader.readXMLData();
+ xmlDataReader.processXml();
}
catch (Exception e)
{
@@ -91,36 +132,42 @@
}
}
- private void readXMLData() throws Exception
+ public void processXml() throws Exception
{
- while (reader.hasNext())
+ try
{
- log.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
- if (reader.getEventType() == XMLStreamConstants.START_ELEMENT)
+ while (reader.hasNext())
{
- if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName()))
+ log.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
+ if (reader.getEventType() == XMLStreamConstants.START_ELEMENT)
{
- bindQueue();
+ if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName()))
+ {
+ bindQueue();
+ }
+ if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
+ {
+ processMessage();
+ }
}
- if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
- {
- sendMessage();
- }
+ reader.next();
}
- reader.next();
+ } finally
+ {
+ if (localSession)
+ {
+ session.close();
+ }
}
-
- session.close();
}
- private void sendMessage() throws Exception
+ private void processMessage() throws Exception
{
- byte type = 0;
- boolean isLarge = false;
- byte priority = 0;
- long expiration = 0;
- long timestamp = 0;
-// HashMap<String, Object> properties = new HashMap<String, Object>();
+ Byte type = 0;
+ Byte priority = 0;
+ Long expiration = 0L;
+ Long timestamp = 0L;
+ org.hornetq.utils.UUID userId = null;
ArrayList<String> queues = new ArrayList<String>();
for (int i = 0; i < reader.getAttributeCount(); i++)
@@ -128,31 +175,8 @@
String attributeName = reader.getAttributeLocalName(i);
if (XmlDataConstants.MESSAGE_TYPE.equals(attributeName))
{
- String value = reader.getAttributeValue(i);
- if (value.equals(XmlDataConstants.DEFAULT_TYPE_PRETTY))
- {
- type = Message.DEFAULT_TYPE;
- } else if (value.equals(XmlDataConstants.BYTES_TYPE_PRETTY))
- {
- type = Message.BYTES_TYPE;
- } else if (value.equals(XmlDataConstants.MAP_TYPE_PRETTY))
- {
- type = Message.MAP_TYPE;
- } else if (value.equals(XmlDataConstants.OBJECT_TYPE_PRETTY))
- {
- type = Message.OBJECT_TYPE;
- } else if (value.equals(XmlDataConstants.STREAM_TYPE_PRETTY))
- {
- type = Message.STREAM_TYPE;
- } else if (value.equals(XmlDataConstants.TEXT_TYPE_PRETTY))
- {
- type = Message.TEXT_TYPE;
- }
+ type = getMessageType(reader.getAttributeValue(i));
}
- else if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName))
- {
- isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
- }
else if (XmlDataConstants.MESSAGE_PRIORITY.equals(attributeName))
{
priority = Byte.parseByte(reader.getAttributeValue(i));
@@ -165,13 +189,16 @@
{
timestamp = Long.parseLong(reader.getAttributeValue(i));
}
+ else if (XmlDataConstants.MESSAGE_USER_ID.equals(attributeName))
+ {
+ userId = org.hornetq.utils.UUIDGenerator.getInstance().generateUUID();
+ }
}
Message message = session.createMessage(type, true, expiration, timestamp, priority);
+ message.setUserID(userId);
boolean endLoop = false;
-// StringBuilder propertiesString = new StringBuilder();
-// StringBuilder queuesString = new StringBuilder();
while (reader.hasNext())
{
@@ -180,85 +207,15 @@
case XMLStreamConstants.START_ELEMENT:
if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName()))
{
- reader.next();
- int start = reader.getTextStart();
- int length = reader.getTextLength();
- message.getBodyBuffer().writeBytes(decode(new String(reader.getTextCharacters(), start, length).trim()));
+ processMessageBody(message);
}
else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName()))
{
- String key = "";
- String value = "";
- String propertyType = "";
-
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
- String attributeName = reader.getAttributeLocalName(i);
- if (XmlDataConstants.PROPERTY_NAME.equals(attributeName))
- {
-// propertiesString.append("(" + XmlDataConstants.PROPERTY_NAME + "=" + reader.getAttributeValue(i));
- key = reader.getAttributeValue(i);
- }
- else if (XmlDataConstants.PROPERTY_VALUE.equals(attributeName))
- {
-// propertiesString.append("," + XmlDataConstants.PROPERTY_VALUE + "=" + reader.getAttributeValue(i));
- value = reader.getAttributeValue(i);
- }
- else if (XmlDataConstants.PROPERTY_TYPE.equals(attributeName))
- {
-// propertiesString.append(", " + XmlDataConstants.PROPERTY_TYPE + "=" + reader.getAttributeValue(i) + ") ");
- propertyType = reader.getAttributeValue(i);
- }
- }
-
- if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SHORT))
- {
- message.putShortProperty(key, Short.parseShort(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BOOLEAN))
- {
- message.putBooleanProperty(key, Boolean.parseBoolean(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTE))
- {
- message.putByteProperty(key, Byte.parseByte(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_DOUBLE))
- {
- message.putDoubleProperty(key, Double.parseDouble(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_FLOAT))
- {
- message.putFloatProperty(key, Float.parseFloat(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_INTEGER))
- {
- message.putIntProperty(key, Integer.parseInt(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_LONG))
- {
- message.putLongProperty(key, Long.parseLong(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING))
- {
- message.putStringProperty(new SimpleString(key), new SimpleString(value));
- }
- else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_STRING))
- {
- message.putStringProperty(key, value);
- }
+ processMessageProperties(message);
}
else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName()))
{
- for (int i = 0; i < reader.getAttributeCount(); i++)
- {
- String attributeName = reader.getAttributeLocalName(i);
- if (XmlDataConstants.QUEUE_NAME.equals(attributeName))
- {
-// queuesString.append("(" + XmlDataConstants.QUEUE_NAME + "=" + reader.getAttributeValue(i) + ")");
- queues.add(reader.getAttributeValue(i));
- }
- }
+ processMessageQueues(queues);
}
break;
case XMLStreamConstants.END_ELEMENT:
@@ -275,17 +232,191 @@
reader.next();
}
- for(String queueName : queues)
+ sendMessage(queues, message);
+ }
+
+ private Byte getMessageType(String value)
+ {
+ Byte type = Message.DEFAULT_TYPE;
+ if (value.equals(XmlDataConstants.DEFAULT_TYPE_PRETTY))
{
- System.out.println("To " + queueName + ": " + message);
- ClientProducer producer = session.createProducer(queueName);
- producer.send(message);
+ type = Message.DEFAULT_TYPE;
}
+ else if (value.equals(XmlDataConstants.BYTES_TYPE_PRETTY))
+ {
+ type = Message.BYTES_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.MAP_TYPE_PRETTY))
+ {
+ type = Message.MAP_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.OBJECT_TYPE_PRETTY))
+ {
+ type = Message.OBJECT_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.STREAM_TYPE_PRETTY))
+ {
+ type = Message.STREAM_TYPE;
+ }
+ else if (value.equals(XmlDataConstants.TEXT_TYPE_PRETTY))
+ {
+ type = Message.TEXT_TYPE;
+ }
+ return type;
+ }
-// System.out.println("Sending message (type=" + type + ", isLarge=" + isLarge + ", priority=" + priority + ", expiration=" + expiration + ", timestamp=" + timestamp + ", properties=" + properties + ", queues=" + queues + ", body=" + new String(decode(body.trim())) + ")");
+ private void sendMessage(ArrayList<String> queues, Message message) throws Exception
+ {
+// System.out.print("To " + addressMap.get(queues.get(0)) + ": " + message + " (routed to: ");
+ ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
+ for (String queue : queues)
+ {
+ ClientRequestor requestor = new ClientRequestor(session, "jms.queue.hornetq.management");
+ ClientMessage managementMessage = session.createMessage(false);
+ ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
+ session.start();
+ ClientMessage reply = requestor.request(managementMessage);
+ long queueID = (Integer) ManagementHelper.getResult(reply);
+// System.out.print(queue + ", ");
+ session.queueQuery(new SimpleString(queue));
+ buffer.putLong(queueID);
+ }
+// System.out.println(")");
+ message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+ ClientProducer producer = session.createProducer(addressMap.get(queues.get(0)));
+ producer.send(message);
+ producer.close();
+ File tempFile = new File(tempFileName);
+ if (!tempFile.delete())
+ {
+ System.err.println("Couldn't delete " + tempFileName);
+ }
}
+ private void processMessageQueues(ArrayList<String> queues)
+ {
+ for (int i = 0; i < reader.getAttributeCount(); i++)
+ {
+ if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i)))
+ {
+ queues.add(reader.getAttributeValue(i));
+ }
+ }
+ }
+
+ private void processMessageProperties(Message message)
+ {
+ String key = "";
+ String value = "";
+ String propertyType = "";
+
+ for (int i = 0; i < reader.getAttributeCount(); i++)
+ {
+ String attributeName = reader.getAttributeLocalName(i);
+ if (XmlDataConstants.PROPERTY_NAME.equals(attributeName))
+ {
+ key = reader.getAttributeValue(i);
+ }
+ else if (XmlDataConstants.PROPERTY_VALUE.equals(attributeName))
+ {
+ value = reader.getAttributeValue(i);
+ }
+ else if (XmlDataConstants.PROPERTY_TYPE.equals(attributeName))
+ {
+ propertyType = reader.getAttributeValue(i);
+ }
+ }
+
+ if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SHORT))
+ {
+ message.putShortProperty(key, Short.parseShort(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BOOLEAN))
+ {
+ message.putBooleanProperty(key, Boolean.parseBoolean(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTE))
+ {
+ message.putByteProperty(key, Byte.parseByte(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTES))
+ {
+ message.putBytesProperty(key, decode(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_DOUBLE))
+ {
+ message.putDoubleProperty(key, Double.parseDouble(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_FLOAT))
+ {
+ message.putFloatProperty(key, Float.parseFloat(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_INTEGER))
+ {
+ message.putIntProperty(key, Integer.parseInt(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_LONG))
+ {
+ message.putLongProperty(key, Long.parseLong(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING))
+ {
+ message.putStringProperty(new SimpleString(key), new SimpleString(value));
+ }
+ else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_STRING))
+ {
+ message.putStringProperty(key, value);
+ }
+ }
+
+ private void processMessageBody(Message message) throws XMLStreamException, IOException
+ {
+ boolean isLarge = false;
+
+ if (reader.getAttributeCount() > 0)
+ {
+ isLarge = Boolean.parseBoolean(reader.getAttributeValue(0));
+ }
+ reader.next();
+ if (isLarge)
+ {
+ tempFileName = UUID.randomUUID().toString() + ".tmp";
+ OutputStream out = new FileOutputStream(tempFileName);
+ while (reader.hasNext())
+ {
+ if (reader.getEventType() == XMLStreamConstants.END_ELEMENT)
+ {
+ break;
+ }
+ else
+ {
+// System.out.println("Reading " + reader.getTextLength() + " characters.");
+ String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
+ String trimmedCharacters = characters.trim();
+ if (trimmedCharacters.length() > 0)
+ {
+ byte[] data = decode(trimmedCharacters);
+// System.out.println(new String(data));
+// System.out.println("Writing " + data.length + " bytes.");
+ out.write(data);
+ }
+ }
+ reader.next();
+ }
+ out.close();
+ FileInputStream fileInputStream = new FileInputStream(tempFileName);
+ BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
+ ((ClientMessage) message).setBodyInputStream(bufferedInput);
+ }
+ else
+ {
+ reader.next(); // step past the "indentation" characters to get to the CDATA
+ String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
+ message.getBodyBuffer().writeBytes(decode(characters.trim()));
+ }
+ }
+
private void bindQueue() throws Exception
{
String queueName = "";
@@ -309,9 +440,16 @@
}
}
- session.createQueue(address, queueName, filter, true);
+ ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
- System.out.println("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
+ if (!queueQuery.isExists())
+ {
+ session.createQueue(address, queueName, filter, true);
+ }
+
+ addressMap.put(queueName, address);
+
+// System.out.println("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
}
// Package protected ---------------------------------------------
@@ -322,7 +460,7 @@
private static byte[] decode(String data)
{
- return Base64.decode(data);
+ return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
}
// Inner classes -------------------------------------------------
Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java 2012-03-09 17:55:33 UTC (rev 12277)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java 2012-03-09 18:01:22 UTC (rev 12278)
@@ -64,12 +64,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.PersistentQueueBindingEncoding;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.MessageDescribe;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
+import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.*;
/**
* @author <a href="mailto:jbertram at redhat.com">Justin Bertram</a>
@@ -118,7 +113,7 @@
return executor;
}
};
-
+
storageManager = new JournalStorageManager(config, executorFactory);
messageRefs = new HashMap<Long, HashMap<Long, ReferenceDescribe>>();
@@ -140,7 +135,8 @@
XMLStreamWriter.class.getClassLoader(),
new Class[]{XMLStreamWriter.class},
handler);
- } catch (Exception e)
+ }
+ catch (Exception e)
{
e.printStackTrace();
}
@@ -157,7 +153,7 @@
}
try
- {
+ {
// long start = System.currentTimeMillis();
XmlDataWriter xmlDataWriter = new XmlDataWriter(System.out, arg[0], arg[1], arg[2], arg[3]);
xmlDataWriter.writeXMLData();
@@ -194,7 +190,7 @@
messageJournal.start();
- ((JournalImpl)messageJournal).load(records, null, null, false);
+ ((JournalImpl) messageJournal).load(records, null, null, false);
for (RecordInfo info : records)
{
@@ -294,7 +290,7 @@
bindingsJournal.start();
- ((JournalImpl)bindingsJournal).load(records, null, null, false);
+ ((JournalImpl) bindingsJournal).load(records, null, null, false);
for (RecordInfo info : records)
{
@@ -456,26 +452,55 @@
{
xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
LargeServerMessage largeMessage = (LargeServerMessage) message;
+ BodyEncoder encoder = null;
try
{
- BodyEncoder encoder = largeMessage.getBodyEncoder();
+ encoder = largeMessage.getBodyEncoder();
encoder.open();
- for (long i = 0; i < encoder.getLargeBodySize(); i += XmlDataConstants.CHUNK)
+ long totalBytesWritten = 0;
+ Long bufferSize;
+ long bodySize = encoder.getLargeBodySize();
+ for (long i = 0; i < bodySize; i += XmlDataConstants.CHUNK)
{
- HornetQBuffer buffer = HornetQBuffers.fixedBuffer(XmlDataConstants.CHUNK);
- encoder.encode(buffer, XmlDataConstants.CHUNK);
- xmlWriter.writeCharacters(encode(buffer.toByteBuffer().array()));
+ Long remainder = bodySize - totalBytesWritten;
+ if (remainder >= XmlDataConstants.CHUNK)
+ {
+ bufferSize = XmlDataConstants.CHUNK;
+ }
+ else
+ {
+ bufferSize = remainder;
+ }
+ HornetQBuffer buffer = HornetQBuffers.fixedBuffer(bufferSize.intValue());
+ encoder.encode(buffer, bufferSize.intValue());
+ xmlWriter.writeCData(encode(buffer.toByteBuffer().array()));
+ totalBytesWritten += bufferSize;
}
+ encoder.close();
}
catch (HornetQException e)
{
e.printStackTrace();
}
+ finally
+ {
+ if (encoder != null)
+ {
+ try
+ {
+ encoder.close();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
}
else
{
- xmlWriter.writeCharacters(encode(message.getBodyBuffer().toByteBuffer().array()));
+ xmlWriter.writeCData(encode(message.getBodyBuffer().toByteBuffer().array()));
}
xmlWriter.writeEndElement(); // end MESSAGE_BODY
}
@@ -499,7 +524,15 @@
Object value = message.getObjectProperty(key);
xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value.toString());
+ if (value instanceof byte[])
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, encode((byte[]) value));
+ }
+ else
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value.toString());
+ }
+
if (value instanceof Boolean)
{
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BOOLEAN);
@@ -536,6 +569,10 @@
{
xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING);
}
+ else if (value instanceof byte[])
+ {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTES);
+ }
}
xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
}
@@ -626,7 +663,7 @@
target.writeCharacters(LINE_SEPARATOR);
target.writeCharacters(indent(depth));
}
- else if ("writeEmptyElement".equals(m) || "writeCharacters".equals(m))
+ else if ("writeEmptyElement".equals(m) || "writeCData".equals(m))
{
target.writeCharacters(LINE_SEPARATOR);
target.writeCharacters(indent(depth));
Added: branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java (rev 0)
+++ branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java 2012-03-09 18:01:22 UTC (rev 12278)
@@ -0,0 +1,468 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import junit.framework.Assert;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
+import org.hornetq.core.persistence.impl.journal.XmlDataReader;
+import org.hornetq.core.persistence.impl.journal.XmlDataWriter;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.UUIDGenerator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * A ExportFormatTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public class XmlImportExportTest extends ServiceTestBase
+{
+ public static final int CONSUMER_TIMEOUT = 5000;
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ protected void tearDown() throws Exception
+ {
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void testMessageProperties() throws Exception
+ {
+ final String QUEUE_NAME = "A1";
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, true, true);
+
+ session.createQueue(QUEUE_NAME, QUEUE_NAME);
+
+ ClientProducer producer = session.createProducer(QUEUE_NAME);
+
+ StringBuilder international = new StringBuilder();
+ for (char x = 800; x < 1200; x++)
+ {
+ international.append(x);
+ }
+
+ String special = "\"<>'&";
+
+ for (int i = 0; i < 5; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.getBodyBuffer().writeString("Bob the giant pig " + i);
+ msg.putBooleanProperty("myBooleanProperty", Boolean.TRUE);
+ msg.putByteProperty("myByteProperty", new Byte("0"));
+ msg.putBytesProperty("myBytesProperty", new byte[]{0, 1, 2, 3, 4});
+ msg.putDoubleProperty("myDoubleProperty", i * 1.6);
+ msg.putFloatProperty("myFloatProperty", i * 2.5F);
+ msg.putIntProperty("myIntProperty", i);
+ msg.putLongProperty("myLongProperty", Long.MAX_VALUE - i);
+ msg.putObjectProperty("myObjectProperty", i);
+ msg.putShortProperty("myShortProperty", new Integer(i).shortValue());
+ msg.putStringProperty("myStringProperty", "myStringPropertyValue_" + i);
+ msg.putStringProperty("myNonAsciiStringProperty", international.toString());
+ msg.putStringProperty("mySpecialCharacters", special);
+ producer.send(msg);
+ }
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataWriter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createInVMNonHALocator();
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+ xmlDataReader.processXml();
+ ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+ session.start();
+
+ for (int i = 0; i < 5; i++)
+ {
+ ClientMessage msg = consumer.receive(CONSUMER_TIMEOUT);
+ byte[] body = new byte[msg.getBodySize()];
+ msg.getBodyBuffer().readBytes(body);
+ Assert.assertTrue(new String(body).contains("Bob the giant pig " + i));
+ Assert.assertEquals(msg.getBooleanProperty("myBooleanProperty"), Boolean.TRUE);
+ Assert.assertEquals(msg.getByteProperty("myByteProperty"), new Byte("0"));
+ byte[] bytes = msg.getBytesProperty("myBytesProperty");
+ for (int j = 0; j < 5; j++)
+ {
+ Assert.assertEquals(j, bytes[j]);
+ }
+ Assert.assertEquals(i * 1.6, msg.getDoubleProperty("myDoubleProperty"));
+ Assert.assertEquals(i * 2.5F, msg.getFloatProperty("myFloatProperty"));
+ Assert.assertEquals(i, msg.getIntProperty("myIntProperty").intValue());
+ Assert.assertEquals(Long.MAX_VALUE - i, msg.getLongProperty("myLongProperty").longValue());
+ Assert.assertEquals(i, msg.getObjectProperty("myObjectProperty"));
+ Assert.assertEquals(new Integer(i).shortValue(), msg.getShortProperty("myShortProperty").shortValue());
+ Assert.assertEquals("myStringPropertyValue_" + i, msg.getStringProperty("myStringProperty"));
+ Assert.assertEquals(international.toString(), msg.getStringProperty("myNonAsciiStringProperty"));
+ Assert.assertEquals(special, msg.getStringProperty("mySpecialCharacters"));
+ }
+
+ session.close();
+ locator.close();
+ server.stop();
+ }
+
+ public void testMessageTypes() throws Exception
+ {
+ final String QUEUE_NAME = "A1";
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, true, true);
+
+ session.createQueue(QUEUE_NAME, QUEUE_NAME);
+
+ ClientProducer producer = session.createProducer(QUEUE_NAME);
+
+
+ ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);
+ producer.send(msg);
+ msg = session.createMessage(Message.DEFAULT_TYPE, true);
+ producer.send(msg);
+ msg = session.createMessage(Message.MAP_TYPE, true);
+ producer.send(msg);
+ msg = session.createMessage(Message.OBJECT_TYPE, true);
+ producer.send(msg);
+ msg = session.createMessage(Message.STREAM_TYPE, true);
+ producer.send(msg);
+ msg = session.createMessage(Message.TEXT_TYPE, true);
+ producer.send(msg);
+ msg = session.createMessage(true);
+ producer.send(msg);
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataWriter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createInVMNonHALocator();
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+ xmlDataReader.processXml();
+ ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+ session.start();
+
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.BYTES_TYPE, msg.getType());
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.DEFAULT_TYPE, msg.getType());
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.MAP_TYPE, msg.getType());
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.OBJECT_TYPE, msg.getType());
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.STREAM_TYPE, msg.getType());
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.TEXT_TYPE, msg.getType());
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Message.DEFAULT_TYPE, msg.getType());
+
+ session.close();
+ locator.close();
+ server.stop();
+ }
+
+ public void testMessageAttributes() throws Exception
+ {
+ final String QUEUE_NAME = "A1";
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, true, true);
+
+ session.createQueue(QUEUE_NAME, QUEUE_NAME);
+
+ ClientProducer producer = session.createProducer(QUEUE_NAME);
+
+ ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);
+ msg.setExpiration(Long.MAX_VALUE);
+ msg.setPriority((byte) 0);
+ msg.setTimestamp(Long.MAX_VALUE - 1);
+ msg.setUserID(UUIDGenerator.getInstance().generateUUID());
+ producer.send(msg);
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataWriter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createInVMNonHALocator();
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+ xmlDataReader.processXml();
+ ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+ session.start();
+
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertEquals(Long.MAX_VALUE, msg.getExpiration());
+ Assert.assertEquals((byte) 0, msg.getPriority());
+ Assert.assertEquals(Long.MAX_VALUE - 1, msg.getTimestamp());
+ Assert.assertNotNull(msg.getUserID());
+
+ session.close();
+ locator.close();
+ server.stop();
+ }
+
+ public void testBindingAttributes() throws Exception
+ {
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, true, true);
+
+ session.createQueue("addressName1", "queueName1");
+ session.createQueue("addressName1", "queueName2", "bob", true);
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataWriter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createInVMNonHALocator();
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+ xmlDataReader.processXml();
+
+ ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString("queueName1"));
+
+ Assert.assertEquals("addressName1", queueQuery.getAddress().toString());
+ Assert.assertNull(queueQuery.getFilterString());
+
+ queueQuery = session.queueQuery(new SimpleString("queueName2"));
+
+ Assert.assertEquals("addressName1", queueQuery.getAddress().toString());
+ Assert.assertEquals("bob", queueQuery.getFilterString().toString());
+ Assert.assertEquals(Boolean.TRUE.booleanValue(), queueQuery.isDurable());
+
+ session.close();
+ locator.close();
+ server.stop();
+ }
+
+ public void testLargeMessage() throws Exception
+ {
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, false);
+
+ try
+ {
+ LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
+
+ fileMessage.setMessageID(1005);
+ fileMessage.setDurable(true);
+
+ for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+ {
+ fileMessage.addBytes(new byte[] { UnitTestCase.getSamplebyte(i) });
+ }
+
+ fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+// fileMessage.releaseResources();
+
+ session.createQueue("A", "A");
+
+ ClientProducer prod = session.createProducer("A");
+
+ prod.send(fileMessage);
+
+ fileMessage.deleteFile();
+
+ session.commit();
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataWriter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createFactory(false);
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+ xmlDataReader.processXml();
+ session.close();
+ session = factory.createSession(false, false);
+ session.start();
+
+ ClientConsumer cons = session.createConsumer("A");
+
+ ClientMessage msg = cons.receive(CONSUMER_TIMEOUT);
+
+ Assert.assertNotNull(msg);
+
+ Assert.assertEquals(2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize());
+
+ for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+ {
+ Assert.assertEquals(UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
+ }
+
+ msg.acknowledge();
+ session.commit();
+ }
+ finally
+ {
+ session.close();
+ factory.close();
+ locator.close();
+ server.stop();
+ }
+ }
+
+ public void testPartialQueue() throws Exception
+ {
+ HornetQServer server = createServer(true);
+ server.start();
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = locator.createSessionFactory();
+ ClientSession session = factory.createSession(false, true, true);
+
+ session.createQueue("myAddress", "myQueue1");
+ session.createQueue("myAddress", "myQueue2");
+
+ ClientProducer producer = session.createProducer("myAddress");
+
+ ClientMessage msg = session.createMessage(true);
+ producer.send(msg);
+
+ ClientConsumer consumer = session.createConsumer("myQueue1");
+ session.start();
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertNotNull(msg);
+ msg.acknowledge();
+ consumer.close();
+
+ session.close();
+ locator.close();
+ server.stop();
+
+ ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+ XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+ xmlDataWriter.writeXMLData();
+ System.out.print(new String(xmlOutputStream.toByteArray()));
+
+ clearData();
+ server.start();
+ locator = createInVMNonHALocator();
+ factory = locator.createSessionFactory();
+ session = factory.createSession(false, true, true);
+
+ ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+ XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
+ xmlDataReader.processXml();
+ consumer = session.createConsumer("myQueue1");
+ session.start();
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertNull(msg);
+ consumer.close();
+
+ consumer = session.createConsumer("myQueue2");
+ msg = consumer.receive(CONSUMER_TIMEOUT);
+ Assert.assertNotNull(msg);
+
+ session.close();
+ locator.close();
+ server.stop();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the hornetq-commits
mailing list