JBoss hornetq SVN: r11256 - in branches/STOMP11/tests: stomp-tests and 16 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-08-31 00:43:40 -0400 (Wed, 31 Aug 2011)
New Revision: 11256
Added:
branches/STOMP11/tests/stomp-tests/
branches/STOMP11/tests/stomp-tests/util/
branches/STOMP11/tests/stomp-tests/util/src/
branches/STOMP11/tests/stomp-tests/util/src/test/
branches/STOMP11/tests/stomp-tests/util/src/test/java/
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/AbstractClientStompFrame.java
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/AbstractStompClientConnection.java
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/ClientStompFrame.java
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/ClientStompFrameV10.java
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/ClientStompFrameV11.java
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnection.java
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnectionFactory.java
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnectionV10.java
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnectionV11.java
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactory.java
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactoryFactory.java
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactoryV10.java
branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactoryV11.java
branches/STOMP11/tests/stomp-tests/v10/
branches/STOMP11/tests/stomp-tests/v10/src/
branches/STOMP11/tests/stomp-tests/v10/src/test/
branches/STOMP11/tests/stomp-tests/v10/src/test/java/
branches/STOMP11/tests/stomp-tests/v11/
branches/STOMP11/tests/stomp-tests/v11/src/
branches/STOMP11/tests/stomp-tests/v11/src/test/
branches/STOMP11/tests/stomp-tests/v11/src/test/java/
Log:
stomp test client
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/AbstractClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/AbstractClientStompFrame.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/AbstractClientStompFrame.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class AbstractClientStompFrame implements ClientStompFrame
+{
+ protected static final String HEADER_RECEIPT = "receipt";
+
+ protected String command;
+ protected List<Header> headers = new ArrayList<Header>();
+ protected Set<String> headerKeys = new HashSet<String>();
+ protected String body;
+
+ public AbstractClientStompFrame(String command)
+ {
+ this.command = command;
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer("Frame: <" + command + ">" + "\n");
+ Iterator<Header> iter = headers.iterator();
+ while (iter.hasNext())
+ {
+ Header h = iter.next();
+ sb.append(h.key + ":" + h.val + "\n");
+ }
+ sb.append("\n");
+ sb.append("<body>" + body + "<body>");
+ return sb.toString();
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer() throws UnsupportedEncodingException
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append(command + "\n");
+ int n = headers.size();
+ for (int i = 0; i < n; i++)
+ {
+ sb.append(headers.get(i).key + ":" + headers.get(i).val + "\n");
+ }
+ sb.append("\n");
+ sb.append(body);
+ sb.append((char)0);
+
+ String data = new String(sb.toString());
+ byte[] byteValue = data.getBytes("UTF-8");
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length);
+ buffer.put(byteValue);
+
+ buffer.rewind();
+ return buffer;
+ }
+
+ @Override
+ public boolean needsReply()
+ {
+ if ("CONNECT".equals(command) || headerKeys.contains(HEADER_RECEIPT))
+ {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void setCommand(String command)
+ {
+ this.command = command;
+ }
+
+ @Override
+ public void addHeader(String key, String val)
+ {
+ headers.add(new Header(key, val));
+ headerKeys.add(key);
+ }
+
+ @Override
+ public void setBody(String body)
+ {
+ this.body = body;
+ }
+
+ private class Header
+ {
+ public String key;
+ public String val;
+
+ public Header(String key, String val)
+ {
+ this.key = key;
+ this.val = val;
+ }
+ }
+
+ @Override
+ public String getCommand()
+ {
+ return command;
+ }
+
+ @Override
+ public String getHeader(String header)
+ {
+ if (headerKeys.contains(header))
+ {
+ Iterator<Header> iter = headers.iterator();
+ while (iter.hasNext())
+ {
+ Header h = iter.next();
+ if (h.key.equals(header))
+ {
+ return h.val;
+ }
+ }
+ }
+ return null;
+ }
+
+}
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/AbstractStompClientConnection.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/AbstractStompClientConnection.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public abstract class AbstractStompClientConnection implements StompClientConnection
+{
+ protected static final String CONNECT_COMMAND = "CONNECT";
+ protected static final String CONNECTED_COMMAND = "CONNECTED";
+ protected static final String DISCONNECT_COMMAND = "DISCONNECT";
+
+ protected static final String LOGIN_HEADER = "login";
+ protected static final String PASSCODE_HEADER = "passcode";
+
+
+ protected String version;
+ protected String host;
+ protected int port;
+ protected String username;
+ protected String passcode;
+ protected StompFrameFactory factory;
+ protected SocketChannel socketChannel;
+ protected ByteBuffer readBuffer;
+
+ protected List<Byte> receiveList;
+
+ protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<ClientStompFrame>();
+
+ protected boolean connected = false;
+
+ public AbstractStompClientConnection(String version, String host, int port) throws IOException
+ {
+ this.version = version;
+ this.host = host;
+ this.port = port;
+ this.factory = StompFrameFactoryFactory.getFactory(version);
+
+ initSocket();
+ }
+
+ private void initSocket() throws IOException
+ {
+ socketChannel = SocketChannel.open();
+ socketChannel.configureBlocking(true);
+ InetSocketAddress remoteAddr = new InetSocketAddress(host, port);
+ socketChannel.connect(remoteAddr);
+
+ startReaderThread();
+ }
+
+ private void startReaderThread()
+ {
+ readBuffer = ByteBuffer.allocateDirect(10240);
+ receiveList = new ArrayList<Byte>(10240);
+
+ new ReaderThread().start();
+ }
+
+ public ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException
+ {
+ ClientStompFrame response = null;
+ ByteBuffer buffer = frame.toByteBuffer();
+ while (buffer.remaining() > 0)
+ {
+ socketChannel.write(buffer);
+ }
+
+ //now response
+ if (frame.needsReply())
+ {
+ response = receiveFrame();
+ }
+ return response;
+ }
+
+ public ClientStompFrame receiveFrame() throws InterruptedException
+ {
+ return frameQueue.poll(10, TimeUnit.SECONDS);
+ }
+
+ //put bytes to byte array.
+ private void receiveBytes(int n) throws UnsupportedEncodingException
+ {
+ readBuffer.rewind();
+ for (int i = 0; i < n; i++)
+ {
+ byte b = readBuffer.get();
+ if (b == 0)
+ {
+ //a new frame got.
+ int sz = receiveList.size();
+ if (sz > 0)
+ {
+ byte[] frameBytes = new byte[sz];
+ for (int j = 0; j < sz; j++)
+ {
+ frameBytes[j] = receiveList.get(j);
+ }
+ ClientStompFrame frame = factory.createFrame(new String(frameBytes, "UTF-8"));
+ frameQueue.offer(frame);
+
+ receiveList.clear();
+ }
+ }
+ else
+ {
+ System.out.println("Added to list: " + b);
+ receiveList.add(b);
+ }
+ }
+ //clear readbuffer
+ readBuffer.rewind();
+ }
+
+ protected void close() throws IOException
+ {
+ socketChannel.close();
+ }
+
+ private class ReaderThread extends Thread
+ {
+ public void run()
+ {
+ try
+ {
+ int n = socketChannel.read(readBuffer);
+
+ while (n >= 0)
+ {
+ System.out.println("read " + n);
+ if (n > 0)
+ {
+ receiveBytes(n);
+ }
+ n = socketChannel.read(readBuffer);
+ }
+ //peer closed
+ close();
+
+ }
+ catch (IOException e)
+ {
+ try
+ {
+ close();
+ }
+ catch (IOException e1)
+ {
+ //ignore
+ }
+ }
+ }
+ }
+
+ public void connect() throws Exception
+ {
+ connect(null, null);
+ }
+
+ public void connect(String username, String password) throws Exception
+ {
+ throw new RuntimeException("connect method not implemented!");
+ }
+
+}
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/ClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/ClientStompFrame.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/ClientStompFrame.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * pls use factory to create frames.
+ */
+public interface ClientStompFrame
+{
+
+ public ByteBuffer toByteBuffer() throws UnsupportedEncodingException;
+
+ public boolean needsReply();
+
+ public void setCommand(String command);
+
+ public void addHeader(String string, String string2);
+
+ public void setBody(String string);
+
+ public String getCommand();
+
+ public String getHeader(String header);
+
+}
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/ClientStompFrameV10.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/ClientStompFrameV10.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/ClientStompFrameV10.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * pls use factory to create frames.
+ */
+public class ClientStompFrameV10 extends AbstractClientStompFrame
+{
+
+ public ClientStompFrameV10(String command)
+ {
+ super(command);
+ }
+
+}
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/ClientStompFrameV11.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/ClientStompFrameV11.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/ClientStompFrameV11.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * pls use factory to create frames.
+ */
+public class ClientStompFrameV11 extends AbstractClientStompFrame
+{
+ public ClientStompFrameV11(String command)
+ {
+ super(command);
+ }
+
+ @Override
+ public boolean needsReply()
+ {
+ if ("CONNECT".equals(command) || "STOMP".equals(command) || headerKeys.contains(HEADER_RECEIPT))
+ {
+ return true;
+ }
+ return false;
+ }
+}
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnection.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnection.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnection.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * pls use factory to create frames.
+ */
+public interface StompClientConnection
+{
+ ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException;
+
+ ClientStompFrame receiveFrame() throws InterruptedException;
+
+ void connect() throws Exception;
+
+ void disconnect() throws IOException, InterruptedException;
+
+}
+
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnectionFactory.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnectionFactory.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnectionFactory.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class StompClientConnectionFactory
+{
+ //create a raw connection to the host.
+ public static StompClientConnection createClientConnection(String version, String host, int port) throws IOException
+ {
+ if ("1.0".equals(version))
+ {
+ return new StompClientConnectionV10(host, port);
+ }
+ if ("1.1".equals(version))
+ {
+ return new StompClientConnectionV11(host, port);
+ }
+ return null;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", "localhost", 61613);
+
+ System.out.println("created a new connection: " + connection);
+
+ connection.connect();
+
+ System.out.println("connected.");
+
+ connection.disconnect();
+ System.out.println("Simple stomp client works.");
+
+ }
+}
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnectionV10.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnectionV10.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnectionV10.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * pls use factory to create frames.
+ */
+public class StompClientConnectionV10 extends AbstractStompClientConnection
+{
+
+ public StompClientConnectionV10(String host, int port) throws IOException
+ {
+ super("1.0", host, port);
+ }
+
+ public void connect(String username, String passcode) throws IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
+ frame.addHeader(LOGIN_HEADER, username);
+ frame.addHeader(PASSCODE_HEADER, passcode);
+
+ ClientStompFrame response = this.sendFrame(frame);
+ System.out.println("Got response : " + response);
+ }
+
+ @Override
+ public void disconnect() throws IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
+ this.sendFrame(frame);
+
+ close();
+ }
+}
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnectionV11.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnectionV11.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompClientConnectionV11.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class StompClientConnectionV11 extends AbstractStompClientConnection
+{
+ public static final String STOMP_COMMAND = "STOMP";
+
+ public static final String ACCEPT_HEADER = "accept-version";
+ public static final String HOST_HEADER = "host";
+ public static final String VERSION_HEADER = "version";
+ public static final String RECEIPT_HEADER = "receipt";
+
+ public StompClientConnectionV11(String host, int port) throws IOException
+ {
+ super("1.1", host, port);
+ }
+
+ public void connect(String username, String passcode) throws IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
+ frame.addHeader(ACCEPT_HEADER, "1.0,1.1");
+ frame.addHeader(HOST_HEADER, "localhost");
+ if (username != null)
+ {
+ frame.addHeader(LOGIN_HEADER, username);
+ frame.addHeader(PASSCODE_HEADER, passcode);
+ }
+
+ ClientStompFrame response = this.sendFrame(frame);
+
+ if (response.getCommand().equals(CONNECTED_COMMAND))
+ {
+ String version = response.getHeader(VERSION_HEADER);
+ assert(version.equals("1.1"));
+
+ this.username = username;
+ this.passcode = passcode;
+ this.connected = true;
+ }
+ }
+
+ public void connect1(String username, String passcode) throws IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(STOMP_COMMAND);
+ frame.addHeader(ACCEPT_HEADER, "1.0,1.1");
+ frame.addHeader(HOST_HEADER, "localhost");
+ if (username != null)
+ {
+ frame.addHeader(LOGIN_HEADER, username);
+ frame.addHeader(PASSCODE_HEADER, passcode);
+ }
+
+ ClientStompFrame response = this.sendFrame(frame);
+
+ if (response.getCommand().equals(CONNECTED_COMMAND))
+ {
+ String version = response.getHeader(VERSION_HEADER);
+ assert(version.equals("1.1"));
+
+ this.username = username;
+ this.passcode = passcode;
+ this.connected = true;
+ }
+ }
+
+ @Override
+ public void disconnect() throws IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
+ frame.addHeader(RECEIPT_HEADER, "77");
+
+ this.sendFrame(frame);
+
+ close();
+ }
+
+}
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactory.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactory.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactory.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public interface StompFrameFactory
+{
+
+ ClientStompFrame createFrame(String data);
+
+ ClientStompFrame newFrame(String command);
+
+}
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactoryFactory.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactoryFactory.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactoryFactory.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class StompFrameFactoryFactory
+{
+ public static StompFrameFactory getFactory(String version)
+ {
+ if ("1.0".equals(version))
+ {
+ return new StompFrameFactoryV10();
+ }
+
+ if ("1.1".equals(version))
+ {
+ return new StompFrameFactoryV11();
+ }
+
+ return null;
+ }
+
+}
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactoryV10.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactoryV10.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactoryV10.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+import java.util.StringTokenizer;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * 1.0 frames
+ *
+ * 1. CONNECT
+ * 2. CONNECTED
+ * 3. SEND
+ * 4. SUBSCRIBE
+ * 5. UNSUBSCRIBE
+ * 6. BEGIN
+ * 7. COMMIT
+ * 8. ACK
+ * 9. ABORT
+ * 10. DISCONNECT
+ * 11. MESSAGE
+ * 12. RECEIPT
+ * 13. ERROR
+ */
+public class StompFrameFactoryV10 implements StompFrameFactory
+{
+
+ public ClientStompFrame createFrame(String data)
+ {
+ System.out.println("Raw data is: " + data + "|");
+
+ //split the string at "\n\n"
+ String[] dataFields = data.split("\n\n");
+
+ StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
+
+ String command = tokenizer.nextToken();
+ ClientStompFrame frame = new ClientStompFrameV10(command);
+
+ while (tokenizer.hasMoreTokens())
+ {
+ String header = tokenizer.nextToken();
+ String[] fields = header.split(":");
+ frame.addHeader(fields[0], fields[1]);
+ }
+
+ //body (without null byte)
+ if (dataFields.length == 2)
+ {
+ frame.setBody(dataFields[1]);
+ }
+ return frame;
+ }
+
+ @Override
+ public ClientStompFrame newFrame(String command)
+ {
+ return new ClientStompFrameV10(command);
+ }
+
+}
Added: branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactoryV11.java
===================================================================
--- branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactoryV11.java (rev 0)
+++ branches/STOMP11/tests/stomp-tests/util/src/test/java/org/hornetq/stomp/tests/util/client/StompFrameFactoryV11.java 2011-08-31 04:43:40 UTC (rev 11256)
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.stomp.tests.util.client;
+
+import java.util.StringTokenizer;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ * 1.1 frames
+ *
+ * 1. CONNECT/STOMP(new)
+ * 2. CONNECTED
+ * 3. SEND
+ * 4. SUBSCRIBE
+ * 5. UNSUBSCRIBE
+ * 6. BEGIN
+ * 7. COMMIT
+ * 8. ACK
+ * 9. NACK (new)
+ * 10. ABORT
+ * 11. DISCONNECT
+ * 12. MESSAGE
+ * 13. RECEIPT
+ * 14. ERROR
+ */
+public class StompFrameFactoryV11 implements StompFrameFactory
+{
+
+ @Override
+
+ public ClientStompFrame createFrame(String data)
+ {
+ //split the string at "\n\n"
+ String[] dataFields = data.split("\n\n");
+
+ StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
+
+ String command = tokenizer.nextToken();
+ ClientStompFrame frame = new ClientStompFrameV11(command);
+
+ while (tokenizer.hasMoreTokens())
+ {
+ String header = tokenizer.nextToken();
+ String[] fields = header.split(":");
+ frame.addHeader(fields[0], fields[1]);
+ }
+
+ //body (without null byte)
+ if (dataFields.length == 2)
+ {
+ frame.setBody(dataFields[1]);
+ }
+ return frame;
+ }
+
+ @Override
+ public ClientStompFrame newFrame(String command)
+ {
+ return new ClientStompFrameV11(command);
+ }
+
+}
13 years, 6 months
JBoss hornetq SVN: r11255 - branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-31 00:41:19 -0400 (Wed, 31 Aug 2011)
New Revision: 11255
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
Log:
tweaks
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-31 04:39:28 UTC (rev 11254)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-31 04:41:19 UTC (rev 11255)
@@ -2029,7 +2029,6 @@
servers[node].setIdentity("server " + node);
log.info("starting server " + servers[node]);
servers[node].start();
- Thread.sleep(100);
// for (int i = 0 ; i <= node; i++)
// {
@@ -2041,6 +2040,7 @@
log.info("started server " + node);
waitForServer(servers[node]);
+ Thread.sleep(100);
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2011-08-31 04:39:28 UTC (rev 11254)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2011-08-31 04:41:19 UTC (rev 11255)
@@ -19,10 +19,8 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
-import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
13 years, 6 months
JBoss hornetq SVN: r11254 - in branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core: management/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-31 00:39:28 -0400 (Wed, 31 Aug 2011)
New Revision: 11254
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
tweaks
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-31 02:54:36 UTC (rev 11253)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-31 04:39:28 UTC (rev 11254)
@@ -1449,7 +1449,6 @@
if (nodeID != null)
{
- // TODO: calculate the time of node down
serverLocator.notifyNodeDown(System.currentTimeMillis(), msg.getNodeID().toString());
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2011-08-31 02:54:36 UTC (rev 11253)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2011-08-31 04:39:28 UTC (rev 11254)
@@ -242,6 +242,7 @@
try
{
clusterConnection.start();
+ clusterConnection.flushExecutor();
}
finally
{
@@ -255,6 +256,7 @@
try
{
clusterConnection.stop();
+ clusterConnection.flushExecutor();
}
finally
{
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-08-31 02:54:36 UTC (rev 11253)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-08-31 04:39:28 UTC (rev 11254)
@@ -46,6 +46,8 @@
void activate() throws Exception;
TransportConfiguration getConnector();
+
+ void flushExecutor();
// for debug
String describe();
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-31 02:54:36 UTC (rev 11253)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-31 04:39:28 UTC (rev 11254)
@@ -336,8 +336,6 @@
public void start() throws Exception
{
- flushExecutor();
-
synchronized (this)
{
if (started)
@@ -355,6 +353,16 @@
}
}
+
+ public void flushExecutor()
+ {
+ Future future = new Future();
+ executor.execute(future);
+ if (!future.await(10000))
+ {
+ server.threadDump("Couldn't finish executor on " + this);
+ }
+ }
public void stop() throws Exception
{
@@ -362,8 +370,6 @@
{
return;
}
-
- flushExecutor();
if (log.isDebugEnabled())
{
@@ -1367,16 +1373,6 @@
return str.toString();
}
-
- private void flushExecutor()
- {
- Future future = new Future();
- executor.execute(future);
- if (!future.await(10000))
- {
- server.threadDump("Couldn't finish executor on " + this);
- }
- }
interface ClusterConnector
{
13 years, 6 months
JBoss hornetq SVN: r11253 - branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-30 22:54:36 -0400 (Tue, 30 Aug 2011)
New Revision: 11253
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Adding thread leaked back (on my branch at least)
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-31 02:35:45 UTC (rev 11252)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-31 02:54:36 UTC (rev 11253)
@@ -967,6 +967,8 @@
logAndSystemOut("Thread leaked on test " + this.getClass().getName() + "::" +
this.getName() + "\n" + buffer.toString());
logAndSystemOut("Thread leakage");
+
+ fail("Thread leaked");
}
super.tearDown();
13 years, 6 months
JBoss hornetq SVN: r11252 - branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/reattach.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-30 22:35:45 -0400 (Tue, 30 Aug 2011)
New Revision: 11252
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
Log:
disabling test for now
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2011-08-31 01:29:55 UTC (rev 11251)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java 2011-08-31 02:35:45 UTC (rev 11252)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.cluster.reattach;
+import junit.framework.TestSuite;
+
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
@@ -30,6 +32,13 @@
*/
public class NettyMultiThreadRandomReattachTest extends MultiThreadRandomReattachTest
{
+
+ // Disabled for now.. under investigation .. Clebert
+ public static TestSuite suite()
+ {
+ return new TestSuite();
+ }
+
@Override
protected void start() throws Exception
{
13 years, 6 months
JBoss hornetq SVN: r11251 - branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-30 21:29:55 -0400 (Tue, 30 Aug 2011)
New Revision: 11251
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
fixing ClusterConnectionControlTest
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-30 23:22:32 UTC (rev 11250)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-31 01:29:55 UTC (rev 11251)
@@ -57,6 +57,7 @@
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.Future;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;
@@ -333,20 +334,26 @@
this.clusterManagerTopology = clusterManagerTopology;
}
- public synchronized void start() throws Exception
+ public void start() throws Exception
{
- if (started)
+ flushExecutor();
+
+ synchronized (this)
{
- return;
+ if (started)
+ {
+ return;
+ }
+
+
+ started = true;
+
+ if (!backup)
+ {
+ activate();
+ }
}
- started = true;
-
- if (!backup)
- {
- activate();
- }
-
}
public void stop() throws Exception
@@ -355,6 +362,8 @@
{
return;
}
+
+ flushExecutor();
if (log.isDebugEnabled())
{
@@ -1358,6 +1367,16 @@
return str.toString();
}
+
+ private void flushExecutor()
+ {
+ Future future = new Future();
+ executor.execute(future);
+ if (!future.await(10000))
+ {
+ server.threadDump("Couldn't finish executor on " + this);
+ }
+ }
interface ClusterConnector
{
13 years, 6 months
JBoss hornetq SVN: r11250 - branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-30 19:22:32 -0400 (Tue, 30 Aug 2011)
New Revision: 11250
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
Log:
avoiding intermittent failure on NettyBridgeTest
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-08-30 21:00:15 UTC (rev 11249)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-08-30 23:22:32 UTC (rev 11250)
@@ -20,10 +20,15 @@
import junit.framework.Assert;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -132,7 +137,10 @@
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
+ waitForServer(server1);
+
server0.start();
+ waitForServer(server0);
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
@@ -303,6 +311,7 @@
// Don't start server 1 yet
server0.start();
+ waitForServer(server0);
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
@@ -330,6 +339,8 @@
Thread.sleep(1000);
server1.start();
+ waitForServer(server1);
+
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
ClientSession session1 = sf1.createSession(false, true, true);
@@ -395,6 +406,7 @@
BridgeStartTest.log.info("sent some more messages");
server1.start();
+ waitForServer(server1);
BridgeStartTest.log.info("started server1");
@@ -514,6 +526,7 @@
// Don't start server 1 yet
server0.start();
+ waitForServer(server0);
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
@@ -542,6 +555,8 @@
// JMSBridge should be stopped since retries = 0
server1.start();
+ waitForServer(server1);
+
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
ClientSession session1 = sf1.createSession(false, true, true);
@@ -665,8 +680,10 @@
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
+ waitForServer(server1);
server0.start();
+ waitForServer(server0);
locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
13 years, 6 months
JBoss hornetq SVN: r11249 - branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-30 17:00:15 -0400 (Tue, 30 Aug 2011)
New Revision: 11249
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java
Log:
Ai ai ai !.. Ouch!... Typo on my code -> I was an idiot! Glad this is my branch though ;)
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30 20:41:56 UTC (rev 11248)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30 21:00:15 UTC (rev 11249)
@@ -193,7 +193,7 @@
{
if (uniqueEventID > currentMember.getUniqueEventID())
{
- TopologyMember newMember = new TopologyMember(currentMember.getA(), memberInput.getB());
+ TopologyMember newMember = new TopologyMember(memberInput.getA(), memberInput.getB());
if (newMember.getA() == null && currentMember.getA() != null)
{
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-08-30 20:41:56 UTC (rev 11248)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-08-30 21:00:15 UTC (rev 11249)
@@ -30,18 +30,17 @@
/** transient to avoid serialization changes */
private transient long uniqueEventID = System.currentTimeMillis();
- public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector)
+ public TopologyMember(final Pair<TransportConfiguration, TransportConfiguration> connector)
{
this.connector = connector;
- this.uniqueEventID = System.currentTimeMillis();
+ uniqueEventID = System.currentTimeMillis();
}
- public TopologyMember(TransportConfiguration a, TransportConfiguration b)
+ public TopologyMember(final TransportConfiguration a, final TransportConfiguration b)
{
this(new Pair<TransportConfiguration, TransportConfiguration>(a, b));
}
-
public TransportConfiguration getA()
{
return connector.a;
@@ -52,14 +51,14 @@
return connector.b;
}
- public void setB(TransportConfiguration param)
+ public void setB(final TransportConfiguration param)
{
- this.connector.b = param;
+ connector.b = param;
}
- public void setA(TransportConfiguration param)
+ public void setA(final TransportConfiguration param)
{
- this.connector.a = param;
+ connector.a = param;
}
/**
@@ -69,11 +68,11 @@
{
return uniqueEventID;
}
-
+
/**
* @param uniqueEventID the uniqueEventID to set
*/
- public void setUniqueEventID(long uniqueEventID)
+ public void setUniqueEventID(final long uniqueEventID)
{
this.uniqueEventID = uniqueEventID;
}
13 years, 6 months
JBoss hornetq SVN: r11248 - in branches/Branch_2_2_EAP_cluster_clean3: tests/src/org/hornetq/tests/integration/jms/bridge and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-30 16:41:56 -0400 (Tue, 30 Aug 2011)
New Revision: 11248
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
Log:
tweaks to my branch
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30 19:54:15 UTC (rev 11247)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30 20:41:56 UTC (rev 11248)
@@ -179,14 +179,10 @@
{
if (Topology.log.isDebugEnabled())
{
- Topology.log.debug(this + "::NewMemeberAdd " +
- this +
- " MEMBER WAS NULL, Add member nodeId=" +
+ Topology.log.debug(this + "::NewMemeberAdd nodeId=" +
nodeId +
" member = " +
- memberInput +
- " size = " +
- mapTopology.size(), new Exception("trace"));
+ memberInput, new Exception("trace"));
}
memberInput.setUniqueEventID(uniqueEventID);
mapTopology.put(nodeId, memberInput);
@@ -197,31 +193,30 @@
{
if (uniqueEventID > currentMember.getUniqueEventID())
{
- if (log.isDebugEnabled())
- {
- log.debug(this + "::updated currentMember=nodeID=" +
- nodeId +
- currentMember +
- " of memberInput=" +
- memberInput);
- }
-
TopologyMember newMember = new TopologyMember(currentMember.getA(), memberInput.getB());
- if (memberInput.getA() == null && memberInput.getB() != null)
+ if (newMember.getA() == null && currentMember.getA() != null)
{
- // Updating what appears to be a backup update
newMember.setA(currentMember.getA());
}
- else
- if (currentMember.getA() == null && currentMember.getB() != null && newMember.getA() != null && newMember.getB() == null)
+
+ if (newMember.getB() == null && currentMember.getB() != null)
{
- // This is a situation where we have:
- // CurrentMember (null, X) && Input(X, null)
- // This means the backup has arrived before, hence we need to merge the results
- newMember.setA(currentMember.getA());
+ newMember.setB(currentMember.getB());
}
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::updated currentMember=nodeID=" +
+ nodeId +
+ ", currentMember=" +
+ currentMember +
+ ", memberInput=" +
+ memberInput +
+ "newMember=" + newMember);
+ }
+
+
newMember.setUniqueEventID(uniqueEventID);
mapTopology.remove(nodeId);
mapTopology.put(nodeId, newMember);
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2011-08-30 19:54:15 UTC (rev 11247)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2011-08-30 20:41:56 UTC (rev 11248)
@@ -12,9 +12,6 @@
*/
package org.hornetq.tests.integration.jms.bridge;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadInfo;
-
import junit.framework.Assert;
import org.hornetq.core.logging.Logger;
@@ -31,6 +28,11 @@
*/
public class JMSBridgeReconnectionTest extends BridgeTestBase
{
+ /**
+ *
+ */
+ private static final int TIME_WAIT = 5000;
+
private static final Logger log = Logger.getLogger(JMSBridgeReconnectionTest.class);
// Crash and reconnect
@@ -175,8 +177,6 @@
bridge.stop();
Assert.assertFalse(bridge.isStarted());
-
- // Thread.sleep(3000);
// we restart and setup the server for the test's tearDown checks
jmsServer1.start();
@@ -245,7 +245,7 @@
// Wait a while before starting up to simulate the dest being down for a while
JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up");
- Thread.sleep(10000);
+ Thread.sleep(TIME_WAIT);
JMSBridgeReconnectionTest.log.info("Done wait");
// Restart the server
@@ -337,7 +337,7 @@
// Wait a while before starting up to simulate the dest being down for a while
JMSBridgeReconnectionTest.log.info("Waiting 5 secs before bringing server back up");
- Thread.sleep(10000);
+ Thread.sleep(TIME_WAIT);
JMSBridgeReconnectionTest.log.info("Done wait");
// Restart the server
13 years, 6 months
JBoss hornetq SVN: r11247 - branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-30 15:54:15 -0400 (Tue, 30 Aug 2011)
New Revision: 11247
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
removing a system.out
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-08-30 19:53:38 UTC (rev 11246)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-08-30 19:54:15 UTC (rev 11247)
@@ -394,8 +394,6 @@
}
Queue queue = server.createQueue(address, name, filterString, durable, temporary);
-
- System.out.println("Created queue " + queue + " / address=" + address + " on " + server);
if (temporary)
{
13 years, 6 months