JBoss hornetq SVN: r11336 - in branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp: util and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-13 01:26:24 -0400 (Tue, 13 Sep 2011)
New Revision: 11336
Added:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionFactory.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactory.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryFactory.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
add tests
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class AbstractClientStompFrame implements ClientStompFrame
+{
+ protected static final String HEADER_RECEIPT = "receipt";
+
+ protected String command;
+ protected List<Header> headers = new ArrayList<Header>();
+ protected Set<String> headerKeys = new HashSet<String>();
+ protected String body;
+
+ public AbstractClientStompFrame(String command)
+ {
+ this.command = command;
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer("Frame: <" + command + ">" + "\n");
+ Iterator<Header> iter = headers.iterator();
+ while (iter.hasNext())
+ {
+ Header h = iter.next();
+ sb.append(h.key + ":" + h.val + "\n");
+ }
+ sb.append("\n");
+ sb.append("<body>" + body + "<body>");
+ return sb.toString();
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer() throws UnsupportedEncodingException
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append(command + "\n");
+ int n = headers.size();
+ for (int i = 0; i < n; i++)
+ {
+ sb.append(headers.get(i).key + ":" + headers.get(i).val + "\n");
+ }
+ sb.append("\n");
+ sb.append(body);
+ sb.append((char)0);
+
+ String data = new String(sb.toString());
+ byte[] byteValue = data.getBytes("UTF-8");
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length);
+ buffer.put(byteValue);
+
+ buffer.rewind();
+ return buffer;
+ }
+
+ @Override
+ public boolean needsReply()
+ {
+ if ("CONNECT".equals(command) || headerKeys.contains(HEADER_RECEIPT))
+ {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void setCommand(String command)
+ {
+ this.command = command;
+ }
+
+ @Override
+ public void addHeader(String key, String val)
+ {
+ headers.add(new Header(key, val));
+ headerKeys.add(key);
+ }
+
+ @Override
+ public void setBody(String body)
+ {
+ this.body = body;
+ }
+
+ private class Header
+ {
+ public String key;
+ public String val;
+
+ public Header(String key, String val)
+ {
+ this.key = key;
+ this.val = val;
+ }
+ }
+
+ @Override
+ public String getCommand()
+ {
+ return command;
+ }
+
+ @Override
+ public String getHeader(String header)
+ {
+ if (headerKeys.contains(header))
+ {
+ Iterator<Header> iter = headers.iterator();
+ while (iter.hasNext())
+ {
+ Header h = iter.next();
+ if (h.key.equals(header))
+ {
+ return h.val;
+ }
+ }
+ }
+ return null;
+ }
+
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public abstract class AbstractStompClientConnection implements StompClientConnection
+{
+ protected static final String CONNECT_COMMAND = "CONNECT";
+ protected static final String CONNECTED_COMMAND = "CONNECTED";
+ protected static final String DISCONNECT_COMMAND = "DISCONNECT";
+
+ protected static final String LOGIN_HEADER = "login";
+ protected static final String PASSCODE_HEADER = "passcode";
+
+
+ protected String version;
+ protected String host;
+ protected int port;
+ protected String username;
+ protected String passcode;
+ protected StompFrameFactory factory;
+ protected SocketChannel socketChannel;
+ protected ByteBuffer readBuffer;
+
+ protected List<Byte> receiveList;
+
+ protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<ClientStompFrame>();
+
+ protected boolean connected = false;
+
+ public AbstractStompClientConnection(String version, String host, int port) throws IOException
+ {
+ this.version = version;
+ this.host = host;
+ this.port = port;
+ this.factory = StompFrameFactoryFactory.getFactory(version);
+
+ initSocket();
+ }
+
+ private void initSocket() throws IOException
+ {
+ socketChannel = SocketChannel.open();
+ socketChannel.configureBlocking(true);
+ InetSocketAddress remoteAddr = new InetSocketAddress(host, port);
+ socketChannel.connect(remoteAddr);
+
+ startReaderThread();
+ }
+
+ private void startReaderThread()
+ {
+ readBuffer = ByteBuffer.allocateDirect(10240);
+ receiveList = new ArrayList<Byte>(10240);
+
+ new ReaderThread().start();
+ }
+
+ public ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException
+ {
+ ClientStompFrame response = null;
+ ByteBuffer buffer = frame.toByteBuffer();
+ while (buffer.remaining() > 0)
+ {
+ socketChannel.write(buffer);
+ }
+
+ //now response
+ if (frame.needsReply())
+ {
+ response = receiveFrame();
+ }
+ return response;
+ }
+
+ public ClientStompFrame receiveFrame() throws InterruptedException
+ {
+ return frameQueue.poll(10, TimeUnit.SECONDS);
+ }
+
+ //put bytes to byte array.
+ private void receiveBytes(int n) throws UnsupportedEncodingException
+ {
+ readBuffer.rewind();
+ for (int i = 0; i < n; i++)
+ {
+ byte b = readBuffer.get();
+ if (b == 0)
+ {
+ //a new frame got.
+ int sz = receiveList.size();
+ if (sz > 0)
+ {
+ byte[] frameBytes = new byte[sz];
+ for (int j = 0; j < sz; j++)
+ {
+ frameBytes[j] = receiveList.get(j);
+ }
+ ClientStompFrame frame = factory.createFrame(new String(frameBytes, "UTF-8"));
+ frameQueue.offer(frame);
+
+ receiveList.clear();
+ }
+ }
+ else
+ {
+ System.out.println("Added to list: " + b);
+ receiveList.add(b);
+ }
+ }
+ //clear readbuffer
+ readBuffer.rewind();
+ }
+
+ protected void close() throws IOException
+ {
+ socketChannel.close();
+ }
+
+ private class ReaderThread extends Thread
+ {
+ public void run()
+ {
+ try
+ {
+ int n = socketChannel.read(readBuffer);
+
+ while (n >= 0)
+ {
+ System.out.println("read " + n);
+ if (n > 0)
+ {
+ receiveBytes(n);
+ }
+ n = socketChannel.read(readBuffer);
+ }
+ //peer closed
+ close();
+
+ }
+ catch (IOException e)
+ {
+ try
+ {
+ close();
+ }
+ catch (IOException e1)
+ {
+ //ignore
+ }
+ }
+ }
+ }
+
+ public void connect() throws Exception
+ {
+ connect(null, null);
+ }
+
+ public void connect(String username, String password) throws Exception
+ {
+ throw new RuntimeException("connect method not implemented!");
+ }
+
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * pls use factory to create frames.
+ */
+public interface ClientStompFrame
+{
+
+ public ByteBuffer toByteBuffer() throws UnsupportedEncodingException;
+
+ public boolean needsReply();
+
+ public void setCommand(String command);
+
+ public void addHeader(String string, String string2);
+
+ public void setBody(String string);
+
+ public String getCommand();
+
+ public String getHeader(String header);
+
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * pls use factory to create frames.
+ */
+public class ClientStompFrameV10 extends AbstractClientStompFrame
+{
+
+ public ClientStompFrameV10(String command)
+ {
+ super(command);
+ }
+
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * pls use factory to create frames.
+ */
+public class ClientStompFrameV11 extends AbstractClientStompFrame
+{
+ public ClientStompFrameV11(String command)
+ {
+ super(command);
+ }
+
+ @Override
+ public boolean needsReply()
+ {
+ if ("CONNECT".equals(command) || "STOMP".equals(command) || headerKeys.contains(HEADER_RECEIPT))
+ {
+ return true;
+ }
+ return false;
+ }
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * pls use factory to create frames.
+ */
+public interface StompClientConnection
+{
+ ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException;
+
+ ClientStompFrame receiveFrame() throws InterruptedException;
+
+ void connect() throws Exception;
+
+ void disconnect() throws IOException, InterruptedException;
+
+}
+
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionFactory.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionFactory.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionFactory.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class StompClientConnectionFactory
+{
+ //create a raw connection to the host.
+ public static StompClientConnection createClientConnection(String version, String host, int port) throws IOException
+ {
+ if ("1.0".equals(version))
+ {
+ return new StompClientConnectionV10(host, port);
+ }
+ if ("1.1".equals(version))
+ {
+ return new StompClientConnectionV11(host, port);
+ }
+ return null;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", "localhost", 61613);
+
+ System.out.println("created a new connection: " + connection);
+
+ connection.connect();
+
+ System.out.println("connected.");
+
+ connection.disconnect();
+ System.out.println("Simple stomp client works.");
+
+ }
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * pls use factory to create frames.
+ */
+public class StompClientConnectionV10 extends AbstractStompClientConnection
+{
+
+ public StompClientConnectionV10(String host, int port) throws IOException
+ {
+ super("1.0", host, port);
+ }
+
+ public void connect(String username, String passcode) throws IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
+ frame.addHeader(LOGIN_HEADER, username);
+ frame.addHeader(PASSCODE_HEADER, passcode);
+
+ ClientStompFrame response = this.sendFrame(frame);
+ System.out.println("Got response : " + response);
+ }
+
+ @Override
+ public void disconnect() throws IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
+ this.sendFrame(frame);
+
+ close();
+ }
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class StompClientConnectionV11 extends AbstractStompClientConnection
+{
+ public static final String STOMP_COMMAND = "STOMP";
+
+ public static final String ACCEPT_HEADER = "accept-version";
+ public static final String HOST_HEADER = "host";
+ public static final String VERSION_HEADER = "version";
+ public static final String RECEIPT_HEADER = "receipt";
+
+ public StompClientConnectionV11(String host, int port) throws IOException
+ {
+ super("1.1", host, port);
+ }
+
+ public void connect(String username, String passcode) throws IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
+ frame.addHeader(ACCEPT_HEADER, "1.0,1.1");
+ frame.addHeader(HOST_HEADER, "localhost");
+ if (username != null)
+ {
+ frame.addHeader(LOGIN_HEADER, username);
+ frame.addHeader(PASSCODE_HEADER, passcode);
+ }
+
+ ClientStompFrame response = this.sendFrame(frame);
+
+ if (response.getCommand().equals(CONNECTED_COMMAND))
+ {
+ String version = response.getHeader(VERSION_HEADER);
+ assert(version.equals("1.1"));
+
+ this.username = username;
+ this.passcode = passcode;
+ this.connected = true;
+ }
+ }
+
+ public void connect1(String username, String passcode) throws IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(STOMP_COMMAND);
+ frame.addHeader(ACCEPT_HEADER, "1.0,1.1");
+ frame.addHeader(HOST_HEADER, "localhost");
+ if (username != null)
+ {
+ frame.addHeader(LOGIN_HEADER, username);
+ frame.addHeader(PASSCODE_HEADER, passcode);
+ }
+
+ ClientStompFrame response = this.sendFrame(frame);
+
+ if (response.getCommand().equals(CONNECTED_COMMAND))
+ {
+ String version = response.getHeader(VERSION_HEADER);
+ assert(version.equals("1.1"));
+
+ this.username = username;
+ this.passcode = passcode;
+ this.connected = true;
+ }
+ }
+
+ @Override
+ public void disconnect() throws IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
+ frame.addHeader(RECEIPT_HEADER, "77");
+
+ this.sendFrame(frame);
+
+ close();
+ }
+
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactory.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactory.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactory.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public interface StompFrameFactory
+{
+
+ ClientStompFrame createFrame(String data);
+
+ ClientStompFrame newFrame(String command);
+
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryFactory.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryFactory.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryFactory.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class StompFrameFactoryFactory
+{
+ public static StompFrameFactory getFactory(String version)
+ {
+ if ("1.0".equals(version))
+ {
+ return new StompFrameFactoryV10();
+ }
+
+ if ("1.1".equals(version))
+ {
+ return new StompFrameFactoryV11();
+ }
+
+ return null;
+ }
+
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV10.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV10.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV10.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+import java.util.StringTokenizer;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * 1.0 frames
+ *
+ * 1. CONNECT
+ * 2. CONNECTED
+ * 3. SEND
+ * 4. SUBSCRIBE
+ * 5. UNSUBSCRIBE
+ * 6. BEGIN
+ * 7. COMMIT
+ * 8. ACK
+ * 9. ABORT
+ * 10. DISCONNECT
+ * 11. MESSAGE
+ * 12. RECEIPT
+ * 13. ERROR
+ */
+public class StompFrameFactoryV10 implements StompFrameFactory
+{
+
+ public ClientStompFrame createFrame(String data)
+ {
+ System.out.println("Raw data is: " + data + "|");
+
+ //split the string at "\n\n"
+ String[] dataFields = data.split("\n\n");
+
+ StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
+
+ String command = tokenizer.nextToken();
+ ClientStompFrame frame = new ClientStompFrameV10(command);
+
+ while (tokenizer.hasMoreTokens())
+ {
+ String header = tokenizer.nextToken();
+ String[] fields = header.split(":");
+ frame.addHeader(fields[0], fields[1]);
+ }
+
+ //body (without null byte)
+ if (dataFields.length == 2)
+ {
+ frame.setBody(dataFields[1]);
+ }
+ return frame;
+ }
+
+ @Override
+ public ClientStompFrame newFrame(String command)
+ {
+ return new ClientStompFrameV10(command);
+ }
+
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.util;
+
+import java.util.StringTokenizer;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * 1.1 frames
+ *
+ * 1. CONNECT/STOMP(new)
+ * 2. CONNECTED
+ * 3. SEND
+ * 4. SUBSCRIBE
+ * 5. UNSUBSCRIBE
+ * 6. BEGIN
+ * 7. COMMIT
+ * 8. ACK
+ * 9. NACK (new)
+ * 10. ABORT
+ * 11. DISCONNECT
+ * 12. MESSAGE
+ * 13. RECEIPT
+ * 14. ERROR
+ */
+public class StompFrameFactoryV11 implements StompFrameFactory
+{
+
+ @Override
+
+ public ClientStompFrame createFrame(String data)
+ {
+ //split the string at "\n\n"
+ String[] dataFields = data.split("\n\n");
+
+ StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
+
+ String command = tokenizer.nextToken();
+ ClientStompFrame frame = new ClientStompFrameV11(command);
+
+ while (tokenizer.hasMoreTokens())
+ {
+ String header = tokenizer.nextToken();
+ String[] fields = header.split(":");
+ frame.addHeader(fields[0], fields[1]);
+ }
+
+ //body (without null byte)
+ if (dataFields.length == 2)
+ {
+ frame.setBody(dataFields[1]);
+ }
+ return frame;
+ }
+
+ @Override
+ public ClientStompFrame newFrame(String command)
+ {
+ return new ClientStompFrameV11(command);
+ }
+
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.v11;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
+import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.unit.util.InVMContext;
+import org.hornetq.tests.util.UnitTestCase;
+
+public abstract class StompTestBase2 extends UnitTestCase
+{
+ private static final transient Logger log = Logger.getLogger(StompTestBase2.class);
+
+ protected String hostname = "127.0.0.1";
+
+ protected int port = 61613;
+
+ private ConnectionFactory connectionFactory;
+
+ private Connection connection;
+
+ protected Session session;
+
+ protected Queue queue;
+
+ protected Topic topic;
+
+ protected JMSServerManager server;
+
+ protected String defUser = "brianm";
+
+ protected String defPass = "wombats";
+
+
+
+ // Implementation methods
+ // -------------------------------------------------------------------------
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer();
+ server.start();
+ connectionFactory = createConnectionFactory();
+
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(getQueueName());
+ topic = session.createTopic(getTopicName());
+ connection.start();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ protected JMSServerManager createServer() throws Exception
+ {
+ Configuration config = createBasicConfig();
+ config.setSecurityEnabled(false);
+ config.setPersistenceEnabled(false);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+ params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+ TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+ config.getAcceptorConfigurations().add(stompTransport);
+ config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ HornetQServer hornetQServer = HornetQServers.newHornetQServer(config, defUser, defPass);
+
+ JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+ jmsConfig.getQueueConfigurations()
+ .add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+ jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
+ server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+ server.setContext(new InVMContext());
+ return server;
+ }
+
+ protected void tearDown() throws Exception
+ {
+ connection.close();
+
+ server.stop();
+
+ super.tearDown();
+ }
+
+ protected ConnectionFactory createConnectionFactory()
+ {
+ return new HornetQJMSConnectionFactory(false, new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ }
+
+ protected String getQueueName()
+ {
+ return "test";
+ }
+
+ protected String getQueuePrefix()
+ {
+ return "jms.queue.";
+ }
+
+ protected String getTopicName()
+ {
+ return "testtopic";
+ }
+
+ protected String getTopicPrefix()
+ {
+ return "jms.topic.";
+ }
+
+ public void sendMessage(String msg) throws Exception
+ {
+ sendMessage(msg, queue);
+ }
+
+ public void sendMessage(String msg, Destination destination) throws Exception
+ {
+ MessageProducer producer = session.createProducer(destination);
+ TextMessage message = session.createTextMessage(msg);
+ producer.send(message);
+ }
+
+ public void sendMessage(byte[] data, Destination destination) throws Exception
+ {
+ sendMessage(data, "foo", "xyz", destination);
+ }
+
+ public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception
+ {
+ sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
+ }
+
+ public void sendMessage(byte[] data, String propertyName, String propertyValue, Destination destination) throws Exception
+ {
+ MessageProducer producer = session.createProducer(destination);
+ BytesMessage message = session.createBytesMessage();
+ message.setStringProperty(propertyName, propertyValue);
+ message.writeBytes(data);
+ producer.send(message);
+ }
+
+}
Added: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java (rev 0)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-13 05:26:24 UTC (rev 11336)
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.v11;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.integration.stomp.util.StompClientConnection;
+
+public class StompTestV11 extends StompTestBase2
+{
+ private static final transient Logger log = Logger.getLogger(StompTestV11.class);
+
+ public void testConnection() throws Exception
+ {
+ StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ }
+}
13 years, 3 months
JBoss hornetq SVN: r11335 - in branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v10 and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-13 00:21:16 -0400 (Tue, 13 Sep 2011)
New Revision: 11335
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
Log:
fix tests
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-13 01:17:00 UTC (rev 11334)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-13 04:21:16 UTC (rev 11335)
@@ -12,6 +12,7 @@
*/
package org.hornetq.core.protocol.stomp;
+import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
@@ -25,7 +26,13 @@
private List<Header> headers = new ArrayList<Header>(10);
private String body;
+ private VersionedStompFrameHandler handler;
+ public HornetQStompException(StompConnection connection, String msg)
+ {
+ super(msg);
+ handler = connection.getFrameHandler();
+ }
public HornetQStompException(String msg)
{
super(msg);
@@ -53,7 +60,32 @@
public StompFrame getFrame()
{
- return null;
+ StompFrame frame = null;
+ if (handler == null)
+ {
+ frame = new StompFrame("ERROR");
+ frame.addHeader("message", this.getMessage());
+ if (body != null)
+ {
+ try
+ {
+ frame.setByteBody(body.getBytes("UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ }
+ }
+ else
+ {
+ frame.setByteBody(new byte[0]);
+ }
+ }
+ else
+ {
+ frame = handler.createStompFrame("ERROR");
+ frame.addHeader("message", this.getMessage());
+ }
+ return frame;
}
private class Header
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-13 01:17:00 UTC (rev 11334)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-13 04:21:16 UTC (rev 11335)
@@ -559,10 +559,6 @@
{
manager.commitTransaction(this, txID);
}
- catch (HornetQStompException e)
- {
- throw e;
- }
catch (Exception e)
{
throw new HornetQStompException("Error committing " + txID, e);
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-13 01:17:00 UTC (rev 11334)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-13 04:21:16 UTC (rev 11335)
@@ -386,9 +386,11 @@
public void beginTransaction(StompConnection connection, String txID) throws Exception
{
+ log.error("-------------------------------begin tx: " + txID);
if (transactedSessions.containsKey(txID))
{
- throw new HornetQStompException("Transaction already started: " + txID);
+ log.error("------------------------------Error, tx already exist!");
+ throw new HornetQStompException(connection, "Transaction already started: " + txID);
}
// create the transacted session
getTransactedSession(connection, txID);
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-13 01:17:00 UTC (rev 11334)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-13 04:21:16 UTC (rev 11335)
@@ -336,13 +336,16 @@
int size = bodyPos - buffer.readerIndex();
buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
byte[] data = new byte[size];
+
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
{
+ log.error("------------------- server message is byte");
frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
buffer.readBytes(data);
}
else
{
+ log.error("------------------- server message is text");
SimpleString text = buffer.readNullableSimpleString();
if (text != null)
{
@@ -353,6 +356,8 @@
data = new byte[0];
}
}
+ frame.setByteBody(data);
+
serverMessage.getBodyBuffer().resetReaderIndex();
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
13 years, 3 months
JBoss hornetq SVN: r11334 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-12 21:17:00 -0400 (Mon, 12 Sep 2011)
New Revision: 11334
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java
Log:
tweak
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java 2011-09-12 22:32:45 UTC (rev 11333)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java 2011-09-13 01:17:00 UTC (rev 11334)
@@ -105,7 +105,6 @@
HashMap<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
- // messagingService = createServer(true, config, 10024, 20024, settings);
messagingService = createServer(true, config, 10024, 200024, settings);
messagingService.start();
13 years, 3 months
JBoss hornetq SVN: r11333 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-12 18:32:45 -0400 (Mon, 12 Sep 2011)
New Revision: 11333
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
Log:
tweaks on discovery
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-12 20:59:11 UTC (rev 11332)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-12 22:32:45 UTC (rev 11333)
@@ -1401,6 +1401,13 @@
for (DiscoveryEntry entry : newConnectors)
{
this.initialConnectors[count++] = entry.getConnector();
+
+ if (topology != null && topology.getMember(entry.getNodeID()) == null)
+ {
+ TopologyMember member = new TopologyMember(entry.getConnector(), null);
+ // on this case we set it as zero as any update coming from server should be accepted
+ topology.updateMember(0, entry.getNodeID(), member);
+ }
}
if (clusterConnection && !receivedTopology && initialConnectors.length > 0)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-12 20:59:11 UTC (rev 11332)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-12 22:32:45 UTC (rev 11333)
@@ -23,7 +23,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.logging.Logger;
@@ -246,33 +245,36 @@
log.trace(this + "::prepare to send " + nodeId + " to " + copy.size() + " elements");
}
- execute(new Runnable()
+ if (copy.size() > 0)
{
- public void run()
+ execute(new Runnable()
{
- for (ClusterTopologyListener listener : copy)
+ public void run()
{
- if (Topology.log.isTraceEnabled())
+ for (ClusterTopologyListener listener : copy)
{
- Topology.log.trace(Topology.this + " informing " +
- listener +
- " about node up = " +
- nodeId +
- " connector = " +
- memberToSend.getConnector());
+ if (Topology.log.isTraceEnabled())
+ {
+ Topology.log.trace(Topology.this + " informing " +
+ listener +
+ " about node up = " +
+ nodeId +
+ " connector = " +
+ memberToSend.getConnector());
+ }
+
+ try
+ {
+ listener.nodeUP(uniqueEventID, nodeId, memberToSend.getConnector(), false);
+ }
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
-
- try
- {
- listener.nodeUP(uniqueEventID, nodeId, memberToSend.getConnector(), false);
- }
- catch (Throwable e)
- {
- log.warn(e.getMessage(), e);
- }
}
- }
- });
+ });
+ }
}
/**
13 years, 3 months
JBoss hornetq SVN: r11332 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-12 16:59:11 -0400 (Mon, 12 Sep 2011)
New Revision: 11332
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
trying to fix discovery tests
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2011-09-12 18:07:09 UTC (rev 11331)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2011-09-12 20:59:11 UTC (rev 11332)
@@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -66,7 +67,7 @@
private final Object waitLock = new Object();
- private final Map<String, DiscoveryEntry> connectors = new HashMap<String, DiscoveryEntry>();
+ private final Map<String, DiscoveryEntry> connectors = new ConcurrentHashMap<String, DiscoveryEntry>();
private final long timeout;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-12 18:07:09 UTC (rev 11331)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-12 20:59:11 UTC (rev 11332)
@@ -1657,7 +1657,7 @@
-1,
groupAddress,
port,
- 1000,
+ 200,
connectorPairs);
configuration.getBroadcastGroupConfigurations().add(bcConfig);
13 years, 3 months
JBoss hornetq SVN: r11331 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-12 14:07:09 -0400 (Mon, 12 Sep 2011)
New Revision: 11331
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
Log:
changing registry to be a concurrent hashmap (fixing a test that failed)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-09-12 16:52:08 UTC (rev 11330)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-09-12 18:07:09 UTC (rev 11331)
@@ -15,7 +15,14 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import javax.management.MBeanServer;
@@ -137,7 +144,7 @@
managementAddress = configuration.getManagementAddress();
managementNotificationAddress = configuration.getManagementNotificationAddress();
- registry = new HashMap<String, Object>();
+ registry = new ConcurrentHashMap<String, Object>();
broadcaster = new NotificationBroadcasterSupport();
notificationsEnabled = true;
objectNameBuilder = ObjectNameBuilder.create(configuration.getJMXDomain());
13 years, 3 months