Author: jfrederic.clere(a)jboss.com
Date: 2007-11-01 09:54:20 -0400 (Thu, 01 Nov 2007)
New Revision: 332
Added:
sandbox/tomcat/tomcat6/README
Modified:
sandbox/tomcat/tomcat6/cluster.patch
Log:
Also remove the cluster tests.
Added: sandbox/tomcat/tomcat6/README
===================================================================
--- sandbox/tomcat/tomcat6/README (rev 0)
+++ sandbox/tomcat/tomcat6/README 2007-11-01 13:54:20 UTC (rev 332)
@@ -0,0 +1,3 @@
+patch.build.patch: Arrange build.
+cluster.patch: remove cluster and documentation.
+nio.patch: remove nio and documentation.
Modified: sandbox/tomcat/tomcat6/cluster.patch
===================================================================
--- sandbox/tomcat/tomcat6/cluster.patch 2007-11-01 13:15:22 UTC (rev 331)
+++ sandbox/tomcat/tomcat6/cluster.patch 2007-11-01 13:54:20 UTC (rev 332)
@@ -35184,3 +35184,5271 @@
#base.path=C:/path/to/the/repository
#base.path=/usr/local
+Index: test/org/apache/catalina/tribes/test/interceptors/TestTwoPhaseCommit.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/interceptors/TestTwoPhaseCommit.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/interceptors/TestTwoPhaseCommit.java (working
copy)
+@@ -1,41 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- */
+-
+-package org.apache.catalina.tribes.test.interceptors;
+-
+-import junit.framework.TestCase;
+-
+-/**
+- * <p>Title: </p>
+- *
+- * <p>Description: </p>
+- *
+- * <p>Company: </p>
+- *
+- * @author not attributable
+- * @version 1.0
+- */
+-public class TestTwoPhaseCommit extends TestCase {
+-
+- protected void setUp() throws Exception {
+- super.setUp();
+- }
+-
+- protected void tearDown() throws Exception {
+- super.tearDown();
+- }
+-
+-}
+Index: test/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java
+===================================================================
+---
test/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java (revision
590752)
++++
test/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java (working
copy)
+@@ -1,111 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.interceptors;
+-
+-import org.apache.catalina.tribes.Channel;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.group.GroupChannel;
+-import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
+-import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+-import junit.framework.TestCase;
+-import junit.framework.TestResult;
+-import junit.framework.TestSuite;
+-
+-public class TestNonBlockingCoordinator extends TestCase {
+-
+- GroupChannel[] channels = null;
+- NonBlockingCoordinator[] coordinators = null;
+- int channelCount = 10;
+- Thread[] threads = null;
+- protected void setUp() throws Exception {
+- System.out.println("Setup");
+- super.setUp();
+- channels = new GroupChannel[channelCount];
+- coordinators = new NonBlockingCoordinator[channelCount];
+- threads = new Thread[channelCount];
+- for ( int i=0; i<channelCount; i++ ) {
+- channels[i] = new GroupChannel();
+- coordinators[i] = new NonBlockingCoordinator();
+- channels[i].addInterceptor(coordinators[i]);
+- channels[i].addInterceptor(new TcpFailureDetector());
+- final int j = i;
+- threads[i] = new Thread() {
+- public void run() {
+- try {
+- channels[j].start(Channel.DEFAULT);
+- Thread.sleep(50);
+- } catch (Exception x) {
+- x.printStackTrace();
+- }
+- }
+- };
+- }
+- for ( int i=0; i<channelCount; i++ ) threads[i].start();
+- for ( int i=0; i<channelCount; i++ ) threads[i].join();
+- Thread.sleep(1000);
+- }
+-
+- public void testCoord1() throws Exception {
+- for (int i=1; i<channelCount; i++ )
+- assertEquals("Message count expected to be
equal.",channels[i-1].getMembers().length,channels[i].getMembers().length);
+- Member member = coordinators[0].getCoordinator();
+- int cnt = 0;
+- while ( member == null && (cnt++ < 100 ) ) try {Thread.sleep(100);
member = coordinators[0].getCoordinator();}catch ( Exception x){}
+- for (int i=0; i<channelCount; i++ )
super.assertEquals(member,coordinators[i].getCoordinator());
+- System.out.println("Coordinator[1] is:"+member);
+-
+- }
+-
+- public void testCoord2() throws Exception {
+- Member member = coordinators[1].getCoordinator();
+- System.out.println("Coordinator[2a] is:" + member);
+- int index = -1;
+- for ( int i=0; i<channelCount; i++ ) {
+- if ( channels[i].getLocalMember(false).equals(member) ) {
+- System.out.println("Shutting down:" +
channels[i].getLocalMember(true).toString());
+- channels[i].stop(Channel.DEFAULT);
+- index = i;
+- }
+- }
+- int dead = index;
+- Thread.sleep(1000);
+- if ( index == 0 ) index = 1; else index = 0;
+- System.out.println("Member
count:"+channels[index].getMembers().length);
+- member = coordinators[index].getCoordinator();
+- for (int i = 1; i < channelCount; i++) if ( i != dead )
super.assertEquals(member, coordinators[i].getCoordinator());
+- System.out.println("Coordinator[2b] is:" + member);
+- }
+-
+- protected void tearDown() throws Exception {
+- System.out.println("tearDown");
+- super.tearDown();
+- for ( int i=0; i<channelCount; i++ ) {
+- channels[i].stop(Channel.DEFAULT);
+- }
+- }
+-
+- public static void main(String[] args) throws Exception {
+- TestSuite suite = new TestSuite();
+- suite.addTestSuite(TestNonBlockingCoordinator.class);
+- suite.run(new TestResult());
+- }
+-
+-
+-
+-
+-
+-}
+Index: test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java (working
copy)
+@@ -1,186 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.interceptors;
+-
+-import org.apache.catalina.tribes.Channel;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.group.GroupChannel;
+-import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
+-import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+-import junit.framework.TestCase;
+-import junit.framework.TestResult;
+-import junit.framework.TestSuite;
+-import org.apache.catalina.tribes.ChannelListener;
+-import java.io.Serializable;
+-import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
+-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+-import org.apache.catalina.tribes.ChannelMessage;
+-import org.apache.catalina.tribes.group.InterceptorPayload;
+-import org.apache.catalina.tribes.ChannelException;
+-import java.util.concurrent.atomic.AtomicInteger;
+-
+-public class TestOrderInterceptor extends TestCase {
+-
+- GroupChannel[] channels = null;
+- OrderInterceptor[] orderitcs = null;
+- MangleOrderInterceptor[] mangleitcs = null;
+- TestListener[] test = null;
+- int channelCount = 2;
+- Thread[] threads = null;
+- protected void setUp() throws Exception {
+- System.out.println("Setup");
+- super.setUp();
+- channels = new GroupChannel[channelCount];
+- orderitcs = new OrderInterceptor[channelCount];
+- mangleitcs = new MangleOrderInterceptor[channelCount];
+- test = new TestListener[channelCount];
+- threads = new Thread[channelCount];
+- for ( int i=0; i<channelCount; i++ ) {
+- channels[i] = new GroupChannel();
+-
+- orderitcs[i] = new OrderInterceptor();
+- mangleitcs[i] = new MangleOrderInterceptor();
+- orderitcs[i].setExpire(Long.MAX_VALUE);
+- channels[i].addInterceptor(orderitcs[i]);
+- channels[i].addInterceptor(mangleitcs[i]);
+- test[i] = new TestListener(i);
+- channels[i].addChannelListener(test[i]);
+- final int j = i;
+- threads[i] = new Thread() {
+- public void run() {
+- try {
+- channels[j].start(Channel.DEFAULT);
+- Thread.sleep(50);
+- } catch (Exception x) {
+- x.printStackTrace();
+- }
+- }
+- };
+- }
+- for ( int i=0; i<channelCount; i++ ) threads[i].start();
+- for ( int i=0; i<channelCount; i++ ) threads[i].join();
+- Thread.sleep(1000);
+- }
+-
+- public void testOrder1() throws Exception {
+- Member[] dest = channels[0].getMembers();
+- final AtomicInteger value = new AtomicInteger(0);
+- for ( int i=0; i<100; i++ ) {
+- channels[0].send(dest,new Integer(value.getAndAdd(1)),0);
+- }
+- Thread.sleep(5000);
+- for ( int i=0; i<test.length; i++ ) {
+- super.assertEquals(false,test[i].fail);
+- }
+- }
+-
+- public void testOrder2() throws Exception {
+- final Member[] dest = channels[0].getMembers();
+- final AtomicInteger value = new AtomicInteger(0);
+- Runnable run = new Runnable() {
+- public void run() {
+- for (int i = 0; i < 100; i++) {
+- try {
+- synchronized (channels[0]) {
+- channels[0].send(dest, new Integer(value.getAndAdd(1)), 0);
+- }
+- }catch ( Exception x ) {
+- x.printStackTrace();
+- assertEquals(true,false);
+- }
+- }
+- }
+- };
+- Thread[] threads = new Thread[5];
+- for (int i=0;i<threads.length;i++) {
+- threads[i] = new Thread(run);
+- }
+- for (int i=0;i<threads.length;i++) {
+- threads[i].start();
+- }
+- for (int i=0;i<threads.length;i++) {
+- threads[i].join();
+- }
+- Thread.sleep(5000);
+- for ( int i=0; i<test.length; i++ ) {
+- super.assertEquals(false,test[i].fail);
+- }
+- }
+-
+-
+- protected void tearDown() throws Exception {
+- System.out.println("tearDown");
+- super.tearDown();
+- for ( int i=0; i<channelCount; i++ ) {
+- channels[i].stop(Channel.DEFAULT);
+- }
+- }
+-
+- public static void main(String[] args) throws Exception {
+- TestSuite suite = new TestSuite();
+- suite.addTestSuite(TestOrderInterceptor.class);
+- suite.run(new TestResult());
+- }
+-
+- public static class TestListener implements ChannelListener {
+- int id = -1;
+- public TestListener(int id) {
+- this.id = id;
+- }
+- int cnt = 0;
+- int total = 0;
+- boolean fail = false;
+- public synchronized void messageReceived(Serializable msg, Member sender) {
+- total++;
+- Integer i = (Integer)msg;
+- if ( i.intValue() != cnt ) fail = true;
+- else cnt++;
+- System.out.println("Listener["+id+"] Message
received:"+i+" Count:"+total+" Fail:"+fail);
+-
+- }
+-
+- public boolean accept(Serializable msg, Member sender) {
+- return (msg instanceof Integer);
+- }
+- }
+-
+- public static class MangleOrderInterceptor extends ChannelInterceptorBase {
+- int cnt = 1;
+- ChannelMessage hold = null;
+- Member[] dest = null;
+- public synchronized void sendMessage(Member[] destination, ChannelMessage msg,
InterceptorPayload payload) throws ChannelException {
+- if ( hold == null ) {
+- //System.out.println("Skipping message:"+msg);
+- hold = (ChannelMessage)msg.deepclone();
+- dest = new Member[destination.length];
+- System.arraycopy(destination,0,dest,0,dest.length);
+- } else {
+- //System.out.println("Sending message:"+msg);
+- super.sendMessage(destination,msg,payload);
+- //System.out.println("Sending message:"+hold);
+- super.sendMessage(dest,hold,null);
+- hold = null;
+- dest = null;
+- }
+- }
+- }
+-
+-
+-
+-
+-
+-}
+Index: test/org/apache/catalina/tribes/test/TribesTestSuite.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/TribesTestSuite.java (revision 590752)
++++ test/org/apache/catalina/tribes/test/TribesTestSuite.java (working copy)
+@@ -1,41 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test;
+-
+-import junit.framework.Test;
+-import junit.framework.TestCase;
+-import junit.framework.TestSuite;
+-
+-public class TribesTestSuite
+- extends TestCase {
+-
+- public TribesTestSuite(String s) {
+- super(s);
+- }
+-
+- public static Test suite() {
+- TestSuite suite = new TestSuite();
+-
suite.addTestSuite(org.apache.catalina.tribes.test.channel.ChannelStartStop.class);
+-
suite.addTestSuite(org.apache.catalina.tribes.test.channel.TestChannelOptionFlag.class);
+-
suite.addTestSuite(org.apache.catalina.tribes.test.membership.MemberSerialization.class);
+-
suite.addTestSuite(org.apache.catalina.tribes.test.membership.TestMemberArrival.class);
+-
suite.addTestSuite(org.apache.catalina.tribes.test.membership.TestTcpFailureDetector.class);
+-
suite.addTestSuite(org.apache.catalina.tribes.test.channel.TestDataIntegrity.class);
+-
suite.addTestSuite(org.apache.catalina.tribes.test.interceptors.TestOrderInterceptor.class);
+- return suite;
+- }
+-}
+Index: test/org/apache/catalina/tribes/test/TestNioSender.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/TestNioSender.java (revision 590752)
++++ test/org/apache/catalina/tribes/test/TestNioSender.java (working copy)
+@@ -1,120 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test;
+-
+-import java.io.IOException;
+-import java.nio.channels.SelectionKey;
+-import java.util.Iterator;
+-import java.nio.channels.Selector;
+-import org.apache.catalina.tribes.transport.nio.NioSender;
+-import org.apache.catalina.tribes.membership.MemberImpl;
+-import org.apache.catalina.tribes.io.ChannelData;
+-import org.apache.catalina.tribes.io.XByteBuffer;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.Channel;
+-
+-/**
+- * <p>Title: </p>
+- *
+- * <p>Description: </p>
+- *
+- * <p>Company: </p>
+- *
+- * @author not attributable
+- * @version 1.0
+- */
+-public class TestNioSender {
+- private Selector selector = null;
+- private int counter = 0;
+- MemberImpl mbr;
+- private static int testOptions = Channel.SEND_OPTIONS_DEFAULT;
+- public TestNioSender() {
+-
+- }
+-
+- public synchronized int inc() {
+- return ++counter;
+- }
+-
+- public synchronized ChannelData getMessage(Member mbr) {
+- String msg = new
String("Thread-"+Thread.currentThread().getName()+" Message:"+inc());
+- ChannelData data = new ChannelData(true);
+- data.setMessage(new XByteBuffer(msg.getBytes(),false));
+- data.setAddress(mbr);
+-
+- return data;
+- }
+-
+- public void init() throws Exception {
+- selector = Selector.open();
+- mbr = new MemberImpl("localhost",4444,0);
+- NioSender sender = new NioSender();
+- sender.setDestination(mbr);
+- sender.setDirectBuffer(true);
+- sender.setSelector(selector);
+- sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr)));
+- sender.connect();
+- }
+-
+- public void run() {
+- while (true) {
+-
+- int selectedKeys = 0;
+- try {
+- selectedKeys = selector.select(100);
+- // if ( selectedKeys == 0 ) {
+- // System.out.println("No registered interests.
Sleeping for a second.");
+- // Thread.sleep(100);
+- } catch (Exception e) {
+- e.printStackTrace();
+- continue;
+- }
+-
+- if (selectedKeys == 0) {
+- continue;
+- }
+-
+- Iterator it = selector.selectedKeys().iterator();
+- while (it.hasNext()) {
+- SelectionKey sk = (SelectionKey) it.next();
+- it.remove();
+- try {
+- int readyOps = sk.readyOps();
+- sk.interestOps(sk.interestOps() & ~readyOps);
+- NioSender sender = (NioSender) sk.attachment();
+- if ( sender.process(sk,
(testOptions&Channel.SEND_OPTIONS_USE_ACK)==Channel.SEND_OPTIONS_USE_ACK) ) {
+- System.out.println("Message completed for
handler:"+sender);
+- Thread.currentThread().sleep(2000);
+- sender.reset();
+-
sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr)));
+- }
+-
+-
+- } catch (Throwable t) {
+- t.printStackTrace();
+- return;
+- }
+- }
+- }
+- }
+-
+- public static void main(String[] args) throws Exception {
+- TestNioSender sender = new TestNioSender();
+- sender.init();
+- sender.run();
+- }
+-}
+Index: test/org/apache/catalina/tribes/test/io/TestSenderConnections.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/io/TestSenderConnections.java (revision 590752)
++++ test/org/apache/catalina/tribes/test/io/TestSenderConnections.java (working copy)
+@@ -1,128 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.io;
+-
+-import java.util.ArrayList;
+-
+-import org.apache.catalina.tribes.Channel;
+-import org.apache.catalina.tribes.ManagedChannel;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.MembershipListener;
+-import org.apache.catalina.tribes.group.GroupChannel;
+-import junit.framework.TestCase;
+-import org.apache.catalina.tribes.ChannelListener;
+-import java.io.Serializable;
+-import java.util.Random;
+-import java.util.HashMap;
+-import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+-
+-public class TestSenderConnections extends TestCase {
+- private static int count = 2;
+- private ManagedChannel[] channels = new ManagedChannel[count];
+- private TestMsgListener[] listeners = new TestMsgListener[count];
+-
+- protected void setUp() throws Exception {
+- super.setUp();
+- for (int i = 0; i < channels.length; i++) {
+- channels[i] = new GroupChannel();
+- channels[i].getMembershipService().setPayload( ("Channel-" + (i +
1)).getBytes("ASCII"));
+- listeners[i] = new TestMsgListener( ("Listener-" + (i + 1)));
+- channels[i].addChannelListener(listeners[i]);
+- channels[i].start(Channel.SND_RX_SEQ|Channel.SND_TX_SEQ);
+-
+- }
+- }
+-
+- public void clear() {
+- }
+-
+- public void sendMessages(long delay, long sleep) throws Exception {
+- Member local = channels[0].getLocalMember(true);
+- Member dest = channels[1].getLocalMember(true);
+- int n = 3;
+- System.out.println("Sending " + n + " messages from [" +
local.getName() + "] to [" + dest.getName() + "]");
+- for (int i = 0; i < n; i++) {
+- channels[0].send(new Member[] {dest}, new TestMsg(), 0);
+- if ( delay > 0 ) Thread.sleep(delay);
+- }
+- System.out.println("Messages sent. Sleeping for "+(sleep/1000)+"
seconds to inspect connections");
+- if ( sleep > 0 ) Thread.sleep(sleep);
+-
+- }
+-
+- public void testConnectionLinger() throws Exception {
+- sendMessages(0,15000);
+- }
+-
+- public void testKeepAliveCount() throws Exception {
+- System.out.println("Setting keep alive count to 0");
+- for (int i = 0; i < channels.length; i++) {
+- ReplicationTransmitter t =
(ReplicationTransmitter)channels[0].getChannelSender();
+- t.getTransport().setKeepAliveCount(0);
+- }
+- sendMessages(1000,15000);
+- }
+-
+- public void testKeepAliveTime() throws Exception {
+- System.out.println("Setting keep alive count to 1 second");
+- for (int i = 0; i < channels.length; i++) {
+- ReplicationTransmitter t =
(ReplicationTransmitter)channels[0].getChannelSender();
+- t.getTransport().setKeepAliveTime(1000);
+- }
+- sendMessages(2000,15000);
+- }
+-
+- protected void tearDown() throws Exception {
+- for (int i = 0; i < channels.length; i++) {
+- channels[i].stop(Channel.DEFAULT);
+- }
+-
+- }
+-
+- public static class TestMsg implements Serializable {
+- static Random r = new Random(System.currentTimeMillis());
+- HashMap map = new HashMap();
+- public TestMsg() {
+- int size = Math.abs(r.nextInt() % 200);
+- for (int i=0; i<size; i++ ) {
+- int length = Math.abs(r.nextInt() %65000);
+- ArrayList list = new ArrayList(length);
+- map.put(new Integer(i),list);
+- }
+- }
+- }
+-
+- public class TestMsgListener implements ChannelListener {
+- public String name = null;
+- public TestMsgListener(String name) {
+- this.name = name;
+- }
+-
+- public void messageReceived(Serializable msg, Member sender) {
+- System.out.println("["+name+"] Received
message:"+msg+" from " + sender.getName());
+- }
+-
+-
+- public boolean accept(Serializable msg, Member sender) {
+- return true;
+- }
+-
+-
+-
+- }
+-
+-}
+Index: test/org/apache/catalina/tribes/test/io/TestSerialization.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/io/TestSerialization.java (revision 590752)
++++ test/org/apache/catalina/tribes/test/io/TestSerialization.java (working copy)
+@@ -1,40 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.io;
+-
+-import org.apache.catalina.tribes.io.XByteBuffer;
+-import junit.framework.TestCase;
+-
+-public class TestSerialization extends TestCase {
+- protected void setUp() throws Exception {
+- super.setUp();
+- }
+-
+- public void testEmptyArray() throws Exception {
+-
+- }
+-
+- protected void tearDown() throws Exception {
+- super.tearDown();
+- }
+-
+- public static void main(String[] args) throws Exception {
+- //XByteBuffer.deserialize(new byte[0]);
+- XByteBuffer.deserialize(new byte[] {-84, -19, 0, 5, 115, 114, 0, 17, 106});
+- }
+-
+-}
+Index: test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java (working copy)
+@@ -1,191 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.channel;
+-
+-import junit.framework.TestCase;
+-import java.io.Serializable;
+-import java.util.Random;
+-import java.util.Arrays;
+-import org.apache.catalina.tribes.ChannelListener;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.group.GroupChannel;
+-import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;
+-import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
+-import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+-
+-/**
+- * <p>Title: </p>
+- *
+- * <p>Description: </p>
+- *
+- * <p>Company: </p>
+- *
+- * @author not attributable
+- * @version 1.0
+- */
+-public class TestDataIntegrity extends TestCase {
+- int msgCount = 500;
+- int threadCount = 20;
+- GroupChannel channel1;
+- GroupChannel channel2;
+- Listener listener1;
+- int threadCounter = 0;
+- protected void setUp() throws Exception {
+- super.setUp();
+- channel1 = new GroupChannel();
+- channel1.addInterceptor(new MessageDispatch15Interceptor());
+- channel2 = new GroupChannel();
+- channel2.addInterceptor(new MessageDispatch15Interceptor());
+- listener1 = new Listener();
+- channel2.addChannelListener(listener1);
+- channel1.start(GroupChannel.DEFAULT);
+- channel2.start(GroupChannel.DEFAULT);
+- }
+-
+- protected void tearDown() throws Exception {
+- super.tearDown();
+- channel1.stop(GroupChannel.DEFAULT);
+- channel2.stop(GroupChannel.DEFAULT);
+- }
+-
+- public void testDataSendNO_ACK() throws Exception {
+- System.err.println("Starting NO_ACK");
+- Thread[] threads = new Thread[threadCount];
+- for (int x=0; x<threads.length; x++ ) {
+- threads[x] = new Thread() {
+- public void run() {
+- try {
+- long start = System.currentTimeMillis();
+- for (int i = 0; i < msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)}, Data.createRandomData(),0);
+- System.out.println("Thread["+this.getName()+"]
sent "+msgCount+" messages in "+(System.currentTimeMillis()-start)+"
ms.");
+- }catch ( Exception x ) {
+- x.printStackTrace();
+- return;
+- } finally {
+- threadCounter++;
+- }
+- }
+- };
+- }
+- for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+- for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+- //sleep for 50 sec, let the other messages in
+- long start = System.currentTimeMillis();
+- while ( (System.currentTimeMillis()-start)<15000 &&
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+- System.err.println("Finished NO_ACK
["+listener1.count+"]");
+- assertEquals("Checking success
messages.",msgCount*threadCount,listener1.count);
+- }
+-
+- public void testDataSendASYNCM() throws Exception {
+- System.err.println("Starting ASYNC MULTI THREAD");
+- Thread[] threads = new Thread[threadCount];
+- for (int x=0; x<threads.length; x++ ) {
+- threads[x] = new Thread() {
+- public void run() {
+- try {
+- long start = System.currentTimeMillis();
+- for (int i = 0; i < msgCount; i++) channel1.send(new
Member[] {channel2.getLocalMember(false)},
Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
+-
System.out.println("Thread["+this.getName()+"] sent "+msgCount+"
messages in "+(System.currentTimeMillis()-start)+" ms.");
+- }catch ( Exception x ) {
+- x.printStackTrace();
+- return;
+- } finally {
+- threadCounter++;
+- }
+- }
+- };
+- }
+- for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+- for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+- //sleep for 50 sec, let the other messages in
+- long start = System.currentTimeMillis();
+- while ( (System.currentTimeMillis()-start)<15000 &&
msgCount*threadCount!=listener1.count) Thread.sleep(500);
+- System.err.println("Finished ASYNC MULTI THREAD
["+listener1.count+"]");
+- assertEquals("Checking success
messages.",msgCount*threadCount,listener1.count);
+- }
+- public void testDataSendASYNC() throws Exception {
+- System.err.println("Starting ASYNC");
+- for (int i=0; i<msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
+- //sleep for 50 sec, let the other messages in
+- long start = System.currentTimeMillis();
+- while ( (System.currentTimeMillis()-start)<5000 &&
msgCount!=listener1.count) Thread.sleep(500);
+- System.err.println("Finished ASYNC");
+- assertEquals("Checking success messages.",msgCount,listener1.count);
+- }
+-
+- public void testDataSendACK() throws Exception {
+- System.err.println("Starting ACK");
+- for (int i=0; i<msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_USE_ACK);
+- Thread.sleep(250);
+- System.err.println("Finished ACK");
+- assertEquals("Checking success messages.",msgCount,listener1.count);
+- }
+-
+- public void testDataSendSYNCACK() throws Exception {
+- System.err.println("Starting SYNC_ACK");
+- for (int i=0; i<msgCount; i++) channel1.send(new Member[]
{channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK);
+- Thread.sleep(250);
+- System.err.println("Finished SYNC_ACK");
+- assertEquals("Checking success messages.",msgCount,listener1.count);
+- }
+-
+- public static class Listener implements ChannelListener {
+- long count = 0;
+- public boolean accept(Serializable s, Member m) {
+- return (s instanceof Data);
+- }
+-
+- public void messageReceived(Serializable s, Member m) {
+- Data d = (Data)s;
+- if ( !Data.verify(d) ) {
+- System.err.println("ERROR");
+- } else {
+- count++;
+- if ((count %1000) ==0 ) {
+- System.err.println("SUCCESS:"+count);
+- }
+- }
+- }
+- }
+-
+- public static class Data implements Serializable {
+- public int length;
+- public byte[] data;
+- public byte key;
+- public static Random r = new Random(System.currentTimeMillis());
+- public static Data createRandomData() {
+- int i = r.nextInt();
+- i = ( i % 127 );
+- int length = Math.abs(r.nextInt() % 65555);
+- Data d = new Data();
+- d.length = length;
+- d.key = (byte)i;
+- d.data = new byte[length];
+- Arrays.fill(d.data,d.key);
+- return d;
+- }
+-
+- public static boolean verify(Data d) {
+- boolean result = (d.length == d.data.length);
+- for ( int i=0; result && (i<d.data.length); i++ ) result = result
&& d.data[i] == d.key;
+- return result;
+- }
+- }
+-
+-
+-
+-}
+Index: test/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java
+===================================================================
+---
test/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java (working
copy)
+@@ -1,137 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.channel;
+-
+-import junit.framework.TestCase;
+-import java.io.Serializable;
+-import java.util.Random;
+-import java.util.Arrays;
+-import org.apache.catalina.tribes.ChannelListener;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.group.GroupChannel;
+-import java.io.PrintStream;
+-
+-/**
+- * <p>Title: </p>
+- *
+- * <p>Description: </p>
+- *
+- * <p>Company: </p>
+- *
+- * @author not attributable
+- * @version 1.0
+- */
+-public class TestRemoteProcessException extends TestCase {
+- int msgCount = 10000;
+- GroupChannel channel1;
+- GroupChannel channel2;
+- Listener listener1;
+- protected void setUp() throws Exception {
+- super.setUp();
+- channel1 = new GroupChannel();
+- channel2 = new GroupChannel();
+- listener1 = new Listener();
+- channel2.addChannelListener(listener1);
+- channel1.start(GroupChannel.DEFAULT);
+- channel2.start(GroupChannel.DEFAULT);
+- }
+-
+- protected void tearDown() throws Exception {
+- super.tearDown();
+- channel1.stop(GroupChannel.DEFAULT);
+- channel2.stop(GroupChannel.DEFAULT);
+- }
+-
+- public void testDataSendSYNCACK() throws Exception {
+- System.err.println("Starting SYNC_ACK");
+- int errC=0, nerrC=0;
+- for (int i=0; i<msgCount; i++) {
+- boolean error = Data.r.nextBoolean();
+-
channel1.send(channel1.getMembers(),Data.createRandomData(error),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK);
+- if ( error ) errC++; else nerrC++;
+- }
+- System.err.println("Finished SYNC_ACK");
+- assertEquals("Checking failure messages.",errC,listener1.errCnt);
+- assertEquals("Checking success messages.",nerrC,listener1.noErrCnt);
+- assertEquals("Checking all
messages.",msgCount,listener1.noErrCnt+listener1.errCnt);
+- System.out.println("Listener 1 stats:");
+- listener1.printStats(System.out);
+- }
+-
+- public static class Listener implements ChannelListener {
+- long noErrCnt = 0;
+- long errCnt = 0;
+- public boolean accept(Serializable s, Member m) {
+- return (s instanceof Data);
+- }
+-
+- public void messageReceived(Serializable s, Member m) {
+- Data d = (Data)s;
+- if ( !Data.verify(d) ) {
+- System.err.println("ERROR");
+- } else {
+- if (d.error) {
+- errCnt++;
+- if ( (errCnt % 100) == 0) {
+- printStats(System.err);
+- }
+- throw new IllegalArgumentException();
+- } else {
+- noErrCnt++;
+- if ( (noErrCnt % 100) == 0) {
+- printStats(System.err);
+- }
+- }
+- }
+- }
+-
+- public void printStats(PrintStream stream) {
+- stream.println("NORMAL:" + noErrCnt);
+- stream.println("FAILURES:" + errCnt);
+- stream.println("TOTAL:" + (errCnt+noErrCnt));
+- }
+- }
+-
+- public static class Data implements Serializable {
+- public int length;
+- public byte[] data;
+- public byte key;
+- public boolean error = false;
+- public static Random r = new Random(System.currentTimeMillis());
+- public static Data createRandomData(boolean error) {
+- int i = r.nextInt();
+- i = ( i % 127 );
+- int length = Math.abs(r.nextInt() % 65555);
+- Data d = new Data();
+- d.length = length;
+- d.key = (byte)i;
+- d.data = new byte[length];
+- Arrays.fill(d.data,d.key);
+- d.error = error;
+- return d;
+- }
+-
+- public static boolean verify(Data d) {
+- boolean result = (d.length == d.data.length);
+- for ( int i=0; result && (i<d.data.length); i++ ) result = result
&& d.data[i] == d.key;
+- return result;
+- }
+- }
+-
+-
+-
+-}
+Index: test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java (working
copy)
+@@ -1,90 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.channel;
+-
+-import junit.framework.*;
+-import org.apache.catalina.tribes.group.*;
+-import org.apache.catalina.tribes.ChannelInterceptor;
+-import org.apache.catalina.tribes.ChannelException;
+-
+-/**
+- * <p>Title: </p>
+- *
+- * <p>Description: </p>
+- *
+- * <p>Company: </p>
+- *
+- * @author not attributable
+- * @version 1.0
+- */
+-public class TestChannelOptionFlag extends TestCase {
+- GroupChannel channel = null;
+- protected void setUp() throws Exception {
+- super.setUp();
+- channel = new GroupChannel();
+- }
+-
+- protected void tearDown() throws Exception {
+- super.tearDown();
+- if ( channel != null ) try {channel.stop(channel.DEFAULT);}catch ( Exception
ignore) {}
+- channel = null;
+- }
+-
+-
+- public void testOptionConflict() throws Exception {
+- boolean error = false;
+- channel.setOptionCheck(true);
+- ChannelInterceptor i = new TestInterceptor();
+- i.setOptionFlag(128);
+- channel.addInterceptor(i);
+- i = new TestInterceptor();
+- i.setOptionFlag(128);
+- channel.addInterceptor(i);
+- try {
+- channel.start(channel.DEFAULT);
+- }catch ( ChannelException x ) {
+- if ( x.getMessage().indexOf("option flag conflict") >= 0 )
error = true;
+- }
+- assertEquals(true,error);
+- }
+-
+- public void testOptionNoConflict() throws Exception {
+- boolean error = false;
+- channel.setOptionCheck(true);
+- ChannelInterceptor i = new TestInterceptor();
+- i.setOptionFlag(128);
+- channel.addInterceptor(i);
+- i = new TestInterceptor();
+- i.setOptionFlag(64);
+- channel.addInterceptor(i);
+- i = new TestInterceptor();
+- i.setOptionFlag(256);
+- channel.addInterceptor(i);
+- try {
+- channel.start(channel.DEFAULT);
+- }catch ( ChannelException x ) {
+- if ( x.getMessage().indexOf("option flag conflict") >= 0 )
error = true;
+- }
+- assertEquals(false,error);
+- }
+-
+- public static class TestInterceptor extends ChannelInterceptorBase {
+-
+- }
+-
+-
+-}
+Index: test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java (revision 590752)
++++ test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java (working copy)
+@@ -1,126 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- */
+-package org.apache.catalina.tribes.test.channel;
+-
+-import org.apache.catalina.tribes.group.GroupChannel;
+-import junit.framework.TestCase;
+-import org.apache.catalina.tribes.transport.ReceiverBase;
+-
+-/**
+- * @author Filip Hanik
+- * @version 1.0
+- */
+-public class ChannelStartStop extends TestCase {
+- GroupChannel channel = null;
+- protected void setUp() throws Exception {
+- super.setUp();
+- channel = new GroupChannel();
+- }
+-
+- protected void tearDown() throws Exception {
+- super.tearDown();
+- try {channel.stop(channel.DEFAULT);}catch (Exception ignore){}
+- }
+-
+- public void testDoubleFullStart() throws Exception {
+- int count = 0;
+- try {
+- channel.start(channel.DEFAULT);
+- count++;
+- } catch ( Exception x){x.printStackTrace();}
+- try {
+- channel.start(channel.DEFAULT);
+- count++;
+- } catch ( Exception x){x.printStackTrace();}
+- assertEquals(count,2);
+- channel.stop(channel.DEFAULT);
+- }
+-
+- public void testScrap() throws Exception {
+- System.out.println(channel.getChannelReceiver().getClass());
+- ((ReceiverBase)channel.getChannelReceiver()).setMaxThreads(1);
+- }
+-
+-
+- public void testDoublePartialStart() throws Exception {
+- //try to double start the RX
+- int count = 0;
+- try {
+- channel.start(channel.SND_RX_SEQ);
+- channel.start(channel.MBR_RX_SEQ);
+- count++;
+- } catch ( Exception x){x.printStackTrace();}
+- try {
+- channel.start(channel.MBR_RX_SEQ);
+- count++;
+- } catch ( Exception x){/*expected*/}
+- assertEquals(count,1);
+- channel.stop(channel.DEFAULT);
+- //double the membership sender
+- count = 0;
+- try {
+- channel.start(channel.SND_RX_SEQ);
+- channel.start(channel.MBR_TX_SEQ);
+- count++;
+- } catch ( Exception x){x.printStackTrace();}
+- try {
+- channel.start(channel.MBR_TX_SEQ);
+- count++;
+- } catch ( Exception x){/*expected*/}
+- assertEquals(count,1);
+- channel.stop(channel.DEFAULT);
+-
+- count = 0;
+- try {
+- channel.start(channel.SND_RX_SEQ);
+- count++;
+- } catch ( Exception x){x.printStackTrace();}
+- try {
+- channel.start(channel.SND_RX_SEQ);
+- count++;
+- } catch ( Exception x){/*expected*/}
+- assertEquals(count,1);
+- channel.stop(channel.DEFAULT);
+-
+- count = 0;
+- try {
+- channel.start(channel.SND_TX_SEQ);
+- count++;
+- } catch ( Exception x){x.printStackTrace();}
+- try {
+- channel.start(channel.SND_TX_SEQ);
+- count++;
+- } catch ( Exception x){/*expected*/}
+- assertEquals(count,1);
+- channel.stop(channel.DEFAULT);
+- }
+-
+- public void testFalseOption() throws Exception {
+- int flag = 0xFFF0;//should get ignored by the underlying components
+- int count = 0;
+- try {
+- channel.start(flag);
+- count++;
+- } catch ( Exception x){x.printStackTrace();}
+- try {
+- channel.start(flag);
+- count++;
+- } catch ( Exception x){/*expected*/}
+- assertEquals(count,2);
+- channel.stop(channel.DEFAULT);
+- }
+-
+-}
+Index: test/org/apache/catalina/tribes/test/membership/MemberSerialization.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/membership/MemberSerialization.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/membership/MemberSerialization.java (working
copy)
+@@ -1,99 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- */
+-package org.apache.catalina.tribes.test.membership;
+-
+-import junit.framework.TestCase;
+-import org.apache.catalina.tribes.membership.MemberImpl;
+-import java.util.Arrays;
+-
+-/**
+- * <p>Title: </p>
+- *
+- * <p>Description: </p>
+- *
+- * <p>Company: </p>
+- *
+- * @author not attributable
+- * @version 1.0
+- */
+-public class MemberSerialization extends TestCase {
+- MemberImpl m1, m2, p1,p2;
+- byte[] payload = null;
+- protected void setUp() throws Exception {
+- super.setUp();
+- payload = new byte[333];
+- Arrays.fill(payload,(byte)1);
+- m1 = new MemberImpl("localhost",3333,1,payload);
+- m2 = new MemberImpl("localhost",3333,1);
+- payload = new byte[333];
+- Arrays.fill(payload,(byte)2);
+- p1 = new MemberImpl("127.0.0.1",3333,1,payload);
+- p2 = new MemberImpl("localhost",3331,1,payload);
+- m1.setDomain(new byte[] {1,2,3,4,5,6,7,8,9});
+- m2.setDomain(new byte[] {1,2,3,4,5,6,7,8,9});
+- m1.setCommand(new byte[] {1,2,4,5,6,7,8,9});
+- m2.setCommand(new byte[] {1,2,4,5,6,7,8,9});
+- }
+-
+- public void testCompare() throws Exception {
+- assertTrue(m1.equals(m2));
+- assertTrue(m2.equals(m1));
+- assertTrue(p1.equals(m2));
+- assertFalse(m1.equals(p2));
+- assertFalse(m1.equals(p2));
+- assertFalse(m2.equals(p2));
+- assertFalse(p1.equals(p2));
+- }
+-
+- public void testSerializationOne() throws Exception {
+- MemberImpl m = m1;
+- byte[] md1 = m.getData(false,true);
+- byte[] mda1 = m.getData(false,false);
+- assertTrue(Arrays.equals(md1,mda1));
+- assertTrue(md1==mda1);
+- mda1 = m.getData(true,true);
+- MemberImpl ma1 = MemberImpl.getMember(mda1);
+- assertTrue(compareMembers(m,ma1));
+- mda1 = p1.getData(false);
+- assertFalse(Arrays.equals(md1,mda1));
+- ma1 = MemberImpl.getMember(mda1);
+- assertTrue(compareMembers(p1,ma1));
+-
+- md1 = m.getData(true,true);
+- Thread.sleep(50);
+- mda1 = m.getData(true,true);
+- MemberImpl a1 = MemberImpl.getMember(md1);
+- MemberImpl a2 = MemberImpl.getMember(mda1);
+- assertTrue(a1.equals(a2));
+- assertFalse(Arrays.equals(md1,mda1));
+-
+-
+- }
+-
+- public boolean compareMembers(MemberImpl impl1, MemberImpl impl2) {
+- boolean result = true;
+- result = result && Arrays.equals(impl1.getHost(),impl2.getHost());
+- result = result &&
Arrays.equals(impl1.getPayload(),impl2.getPayload());
+- result = result &&
Arrays.equals(impl1.getUniqueId(),impl2.getUniqueId());
+- result = result && impl1.getPort() == impl2.getPort();
+- return result;
+- }
+-
+- protected void tearDown() throws Exception {
+- super.tearDown();
+- }
+-
+-}
+Index: test/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/membership/TestMemberArrival.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/membership/TestMemberArrival.java (working
copy)
+@@ -1,117 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.membership;
+-
+-import java.util.ArrayList;
+-
+-import org.apache.catalina.tribes.Channel;
+-import org.apache.catalina.tribes.ManagedChannel;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.MembershipListener;
+-import org.apache.catalina.tribes.group.GroupChannel;
+-import junit.framework.TestCase;
+-
+-public class TestMemberArrival
+- extends TestCase {
+- private static int count = 10;
+- private ManagedChannel[] channels = new ManagedChannel[count];
+- private TestMbrListener[] listeners = new TestMbrListener[count];
+-
+- protected void setUp() throws Exception {
+- super.setUp();
+- for (int i = 0; i < channels.length; i++) {
+- channels[i] = new GroupChannel();
+- channels[i].getMembershipService().setPayload( ("Channel-" + (i +
1)).getBytes("ASCII"));
+- listeners[i] = new TestMbrListener( ("Listener-" + (i + 1)));
+- channels[i].addMembershipListener(listeners[i]);
+-
+- }
+- }
+-
+- public void clear() {
+- for (int i = 0; i < channels.length; i++) {
+- listeners[i].members.clear();
+- }
+- }
+-
+- public void testMemberArrival() throws Exception {
+- //purpose of this test is to make sure that we have received all the members
+- //that we can expect before the start method returns
+- Thread[] threads = new Thread[channels.length];
+- for (int i=0; i<channels.length; i++ ) {
+- final Channel channel = channels[i];
+- Thread t = new Thread() {
+- public void run() {
+- try {
+- channel.start(Channel.DEFAULT);
+- }catch ( Exception x ) {
+- throw new RuntimeException(x);
+- }
+- }
+- };
+- threads[i] = t;
+- }
+- for (int i=0; i<threads.length; i++ ) threads[i].start();
+- for (int i=0; i<threads.length; i++ ) threads[i].join();
+- Thread.sleep(2000);
+- System.out.println("All channels started.");
+- for (int i=listeners.length-1; i>=0; i-- ) assertEquals("Checking member
arrival length",channels.length-1,listeners[i].members.size());
+- }
+-
+- protected void tearDown() throws Exception {
+-
+- for (int i = 0; i < channels.length; i++) {
+- try {
+- channels[i].stop(Channel.DEFAULT);
+- } catch (Exception ignore) {}
+- }
+- super.tearDown();
+- }
+-
+- public class TestMbrListener
+- implements MembershipListener {
+- public String name = null;
+- public TestMbrListener(String name) {
+- this.name = name;
+- }
+-
+- public ArrayList members = new ArrayList();
+- public void memberAdded(Member member) {
+- if (!members.contains(member)) {
+- members.add(member);
+- try {
+- System.out.println(name + ":member added[" + new
String(member.getPayload(), "ASCII") + ";
Thread:"+Thread.currentThread().getName()+"]");
+- } catch (Exception x) {
+- System.out.println(name + ":member added[unknown]");
+- }
+- }
+- }
+-
+- public void memberDisappeared(Member member) {
+- if (members.contains(member)) {
+- members.remove(member);
+- try {
+- System.out.println(name + ":member disappeared[" + new
String(member.getPayload(), "ASCII") + ";
Thread:"+Thread.currentThread().getName()+"]");
+- } catch (Exception x) {
+- System.out.println(name + ":member
disappeared[unknown]");
+- }
+- }
+- }
+-
+- }
+-
+-}
+Index: test/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java (working
copy)
+@@ -1,165 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.membership;
+-
+-import java.util.ArrayList;
+-
+-import org.apache.catalina.tribes.ByteMessage;
+-import org.apache.catalina.tribes.Channel;
+-import org.apache.catalina.tribes.ChannelException;
+-import org.apache.catalina.tribes.ManagedChannel;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.MembershipListener;
+-import org.apache.catalina.tribes.group.GroupChannel;
+-import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+-import junit.framework.TestCase;
+-
+-/**
+- * <p>Title: </p>
+- *
+- * <p>Description: </p>
+- *
+- * <p>Company: </p>
+- *
+- * @author not attributable
+- * @version 1.0
+- */
+-public class TestTcpFailureDetector extends TestCase {
+- private TcpFailureDetector tcpFailureDetector1 = null;
+- private TcpFailureDetector tcpFailureDetector2 = null;
+- private ManagedChannel channel1 = null;
+- private ManagedChannel channel2 = null;
+- private TestMbrListener mbrlist1 = null;
+- private TestMbrListener mbrlist2 = null;
+- protected void setUp() throws Exception {
+- super.setUp();
+- channel1 = new GroupChannel();
+- channel2 = new GroupChannel();
+-
channel1.getMembershipService().setPayload("Channel-1".getBytes("ASCII"));
+-
channel2.getMembershipService().setPayload("Channel-2".getBytes("ASCII"));
+- mbrlist1 = new TestMbrListener("Channel-1");
+- mbrlist2 = new TestMbrListener("Channel-2");
+- tcpFailureDetector1 = new TcpFailureDetector();
+- tcpFailureDetector2 = new TcpFailureDetector();
+- channel1.addInterceptor(tcpFailureDetector1);
+- channel2.addInterceptor(tcpFailureDetector2);
+- channel1.addMembershipListener(mbrlist1);
+- channel2.addMembershipListener(mbrlist2);
+- }
+-
+- public void clear() {
+- mbrlist1.members.clear();
+- mbrlist2.members.clear();
+- }
+-
+- public void testTcpSendFailureMemberDrop() throws Exception {
+- System.out.println("testTcpSendFailureMemberDrop()");
+- clear();
+- channel1.start(channel1.DEFAULT);
+- channel2.start(channel2.DEFAULT);
+- //Thread.sleep(1000);
+- assertEquals("Expecting member count to be
equal",mbrlist1.members.size(),mbrlist2.members.size());
+- channel2.stop(channel2.SND_RX_SEQ);
+- ByteMessage msg = new ByteMessage(new byte[1024]);
+- try {
+- channel1.send(channel1.getMembers(), msg, 0);
+- assertEquals("Message send should have failed.",true,false);
+- } catch ( ChannelException x ) {
+-
+- }
+- assertEquals("Expecting member count to not be
equal",mbrlist1.members.size()+1,mbrlist2.members.size());
+- channel1.stop(Channel.DEFAULT);
+- channel2.stop(Channel.DEFAULT);
+- }
+-
+- public void testTcpFailureMemberAdd() throws Exception {
+- System.out.println("testTcpFailureMemberAdd()");
+- clear();
+- channel1.start(channel1.DEFAULT);
+- channel2.start(channel2.SND_RX_SEQ);
+- channel2.start(channel2.SND_TX_SEQ);
+- channel2.start(channel2.MBR_RX_SEQ);
+- channel2.stop(channel2.SND_RX_SEQ);
+- channel2.start(channel2.MBR_TX_SEQ);
+- //Thread.sleep(1000);
+- assertEquals("Expecting member count to not be
equal",mbrlist1.members.size()+1,mbrlist2.members.size());
+- channel1.stop(Channel.DEFAULT);
+- channel2.stop(Channel.DEFAULT);
+- }
+-
+- public void testTcpMcastFail() throws Exception {
+- System.out.println("testTcpMcastFail()");
+- clear();
+- channel1.start(channel1.DEFAULT);
+- channel2.start(channel2.DEFAULT);
+- //Thread.sleep(1000);
+- assertEquals("Expecting member count to be
equal",mbrlist1.members.size(),mbrlist2.members.size());
+- channel2.stop(channel2.MBR_TX_SEQ);
+- ByteMessage msg = new ByteMessage(new byte[1024]);
+- try {
+- Thread.sleep(5000);
+- assertEquals("Expecting member count to be
equal",mbrlist1.members.size(),mbrlist2.members.size());
+- channel1.send(channel1.getMembers(), msg, 0);
+- } catch ( ChannelException x ) {
+- assertEquals("Message send should have succeeded.",true,false);
+- }
+- channel1.stop(Channel.DEFAULT);
+- channel2.stop(Channel.DEFAULT);
+- }
+-
+-
+- protected void tearDown() throws Exception {
+- tcpFailureDetector1 = null;
+- tcpFailureDetector2 = null;
+- try { channel1.stop(Channel.DEFAULT);}catch (Exception ignore){}
+- channel1 = null;
+- try { channel2.stop(Channel.DEFAULT);}catch (Exception ignore){}
+- channel2 = null;
+- super.tearDown();
+- }
+-
+- public class TestMbrListener implements MembershipListener {
+- public String name = null;
+- public TestMbrListener(String name) {
+- this.name = name;
+- }
+- public ArrayList members = new ArrayList();
+- public void memberAdded(Member member) {
+- if ( !members.contains(member) ) {
+- members.add(member);
+- try{
+- System.out.println(name + ":member added[" + new
String(member.getPayload(), "ASCII") + "]");
+- }catch ( Exception x ) {
+- System.out.println(name + ":member added[unknown]");
+- }
+- }
+- }
+-
+- public void memberDisappeared(Member member) {
+- if ( members.contains(member) ) {
+- members.remove(member);
+- try{
+- System.out.println(name + ":member disappeared[" + new
String(member.getPayload(), "ASCII") + "]");
+- }catch ( Exception x ) {
+- System.out.println(name + ":member
disappeared[unknown]");
+- }
+- }
+- }
+-
+- }
+-
+-}
+Index: test/org/apache/catalina/tribes/test/membership/TestDomainFilter.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/membership/TestDomainFilter.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/membership/TestDomainFilter.java (working copy)
+@@ -1,120 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.membership;
+-
+-import java.util.ArrayList;
+-
+-import org.apache.catalina.tribes.Channel;
+-import org.apache.catalina.tribes.ManagedChannel;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.MembershipListener;
+-import org.apache.catalina.tribes.group.GroupChannel;
+-import junit.framework.TestCase;
+-import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
+-import org.apache.catalina.tribes.util.UUIDGenerator;
+-
+-public class TestDomainFilter
+- extends TestCase {
+- private static int count = 10;
+- private ManagedChannel[] channels = new ManagedChannel[count];
+- private TestMbrListener[] listeners = new TestMbrListener[count];
+-
+- protected void setUp() throws Exception {
+- super.setUp();
+- for (int i = 0; i < channels.length; i++) {
+- channels[i] = new GroupChannel();
+- channels[i].getMembershipService().setPayload( ("Channel-" + (i +
1)).getBytes("ASCII"));
+- listeners[i] = new TestMbrListener( ("Listener-" + (i + 1)));
+- channels[i].addMembershipListener(listeners[i]);
+- DomainFilterInterceptor filter = new DomainFilterInterceptor();
+- filter.setDomain(UUIDGenerator.randomUUID(false));
+- channels[i].addInterceptor(filter);
+- }
+- }
+-
+- public void clear() {
+- for (int i = 0; i < channels.length; i++) {
+- listeners[i].members.clear();
+- }
+- }
+-
+- public void testMemberArrival() throws Exception {
+- //purpose of this test is to make sure that we have received all the members
+- //that we can expect before the start method returns
+- Thread[] threads = new Thread[channels.length];
+- for (int i=0; i<channels.length; i++ ) {
+- final Channel channel = channels[i];
+- Thread t = new Thread() {
+- public void run() {
+- try {
+- channel.start(Channel.DEFAULT);
+- }catch ( Exception x ) {
+- throw new RuntimeException(x);
+- }
+- }
+- };
+- threads[i] = t;
+- }
+- for (int i=0; i<threads.length; i++ ) threads[i].start();
+- for (int i=0; i<threads.length; i++ ) threads[i].join();
+- System.out.println("All channels started.");
+- for (int i=listeners.length-1; i>=0; i-- ) assertEquals("Checking member
arrival length",0,listeners[i].members.size());
+- }
+-
+- protected void tearDown() throws Exception {
+-
+- for (int i = 0; i < channels.length; i++) {
+- try {
+- channels[i].stop(Channel.DEFAULT);
+- } catch (Exception ignore) {}
+- }
+- super.tearDown();
+- }
+-
+- public class TestMbrListener
+- implements MembershipListener {
+- public String name = null;
+- public TestMbrListener(String name) {
+- this.name = name;
+- }
+-
+- public ArrayList members = new ArrayList();
+- public void memberAdded(Member member) {
+- if (!members.contains(member)) {
+- members.add(member);
+- try {
+- System.out.println(name + ":member added[" + new
String(member.getPayload(), "ASCII") + ";
Thread:"+Thread.currentThread().getName()+"]");
+- } catch (Exception x) {
+- System.out.println(name + ":member added[unknown]");
+- }
+- }
+- }
+-
+- public void memberDisappeared(Member member) {
+- if (members.contains(member)) {
+- members.remove(member);
+- try {
+- System.out.println(name + ":member disappeared[" + new
String(member.getPayload(), "ASCII") + ";
Thread:"+Thread.currentThread().getName()+"]");
+- } catch (Exception x) {
+- System.out.println(name + ":member
disappeared[unknown]");
+- }
+- }
+- }
+-
+- }
+-
+-}
+Index: test/org/apache/catalina/tribes/test/transport/SocketNioReceive.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/transport/SocketNioReceive.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/transport/SocketNioReceive.java (working copy)
+@@ -1,92 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.transport;
+-
+-import java.text.DecimalFormat;
+-
+-import org.apache.catalina.tribes.ChannelMessage;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.MessageListener;
+-import org.apache.catalina.tribes.io.ChannelData;
+-import org.apache.catalina.tribes.io.XByteBuffer;
+-import org.apache.catalina.tribes.membership.MemberImpl;
+-import org.apache.catalina.tribes.transport.nio.NioReceiver;
+-
+-public class SocketNioReceive {
+- static int count = 0;
+- static int accept = 0;
+- static long start = 0;
+- static double mb = 0;
+- static int len = 0;
+- static DecimalFormat df = new DecimalFormat("##.00");
+- static double seconds = 0;
+-
+- protected static Object mutex = new Object();
+- public static void main(String[] args) throws Exception {
+- Member mbr = new MemberImpl("localhost", 9999, 0);
+- ChannelData data = new ChannelData();
+- data.setAddress(mbr);
+- byte[] buf = new byte[8192 * 4];
+- data.setMessage(new XByteBuffer(buf, false));
+- buf = XByteBuffer.createDataPackage(data);
+- len = buf.length;
+- NioReceiver receiver = new NioReceiver();
+- receiver.setPort(9999);
+- receiver.setHost("localhost");
+- MyList list = new MyList();
+- receiver.setMessageListener(list);
+- receiver.start();
+- System.out.println("Listening on 9999");
+- while (true) {
+- try {
+- synchronized (mutex) {
+- mutex.wait(5000);
+- if ( start != 0 ) {
+- System.out.println("Throughput " + df.format(mb /
seconds) + " MB/seconds, messages "+count+" accepts "+accept+",
total "+mb+" MB.");
+- }
+- }
+- }catch (Throwable x) {
+- x.printStackTrace();
+- }
+- }
+- }
+-
+- public static class MyList implements MessageListener {
+- boolean first = true;
+-
+-
+- public void messageReceived(ChannelMessage msg) {
+- if (first) {
+- first = false;
+- start = System.currentTimeMillis();
+- }
+- mb += ( (double) len) / 1024 / 1024;
+- synchronized (this) {count++;}
+- if ( ( (count) % 10000) == 0) {
+- long time = System.currentTimeMillis();
+- seconds = ( (double) (time - start)) / 1000;
+- System.out.println("Throughput " + df.format(mb / seconds) +
" MB/seconds, messages "+count+", total "+mb+" MB.");
+- }
+- }
+-
+- public boolean accept(ChannelMessage msg) {
+- synchronized (this) {accept++;}
+- return true;
+- }
+-
+- }
+-}
+Index: test/org/apache/catalina/tribes/test/transport/SocketNioValidateSend.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/transport/SocketNioValidateSend.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/transport/SocketNioValidateSend.java (working
copy)
+@@ -1,106 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.transport;
+-
+-import java.io.OutputStream;
+-import java.net.Socket;
+-import java.text.DecimalFormat;
+-import org.apache.catalina.tribes.transport.nio.NioSender;
+-import org.apache.catalina.tribes.membership.MemberImpl;
+-import java.nio.channels.Selector;
+-import org.apache.catalina.tribes.io.XByteBuffer;
+-import org.apache.catalina.tribes.Member;
+-import java.nio.channels.SelectionKey;
+-import java.util.Iterator;
+-import org.apache.catalina.tribes.Channel;
+-import org.apache.catalina.tribes.io.ChannelData;
+-import java.math.BigDecimal;
+-import java.util.Arrays;
+-
+-public class SocketNioValidateSend {
+-
+- public static void main(String[] args) throws Exception {
+- Selector selector = Selector.open();
+- Member mbr = new MemberImpl("localhost", 9999, 0);
+- byte seq = 0;
+- byte[] buf = new byte[50000];
+- Arrays.fill(buf,seq);
+- int len = buf.length;
+- BigDecimal total = new BigDecimal((double)0);
+- BigDecimal bytes = new BigDecimal((double)len);
+- NioSender sender = new NioSender();
+- sender.setDestination(mbr);
+- sender.setDirectBuffer(true);
+- sender.setSelector(selector);
+- sender.connect();
+- sender.setMessage(buf);
+- System.out.println("Writing to 9999");
+- long start = 0;
+- double mb = 0;
+- boolean first = true;
+- int count = 0;
+-
+- DecimalFormat df = new DecimalFormat("##.00");
+- while (count<100000) {
+- if (first) {
+- first = false;
+- start = System.currentTimeMillis();
+- }
+- sender.setMessage(buf);
+- int selectedKeys = 0;
+- try {
+- selectedKeys = selector.select(0);
+- } catch (Exception e) {
+- e.printStackTrace();
+- continue;
+- }
+-
+- if (selectedKeys == 0) {
+- continue;
+- }
+-
+- Iterator it = selector.selectedKeys().iterator();
+- while (it.hasNext()) {
+- SelectionKey sk = (SelectionKey) it.next();
+- it.remove();
+- try {
+- int readyOps = sk.readyOps();
+- sk.interestOps(sk.interestOps() & ~readyOps);
+- if (sender.process(sk, false)) {
+- total = total.add(bytes);
+- sender.reset();
+- seq++;
+- Arrays.fill(buf,seq);
+- sender.setMessage(buf);
+- mb += ( (double) len) / 1024 / 1024;
+- if ( ( (++count) % 10000) == 0) {
+- long time = System.currentTimeMillis();
+- double seconds = ( (double) (time - start)) / 1000;
+- System.out.println("Throughput " + df.format(mb /
seconds) + " MB/seconds, total "+mb+" MB, total "+total+"
bytes.");
+- }
+- }
+-
+- } catch (Throwable t) {
+- t.printStackTrace();
+- return;
+- }
+- }
+- }
+- System.out.println("Complete, sleeping 15 seconds");
+- Thread.sleep(15000);
+- }
+-}
+Index: test/org/apache/catalina/tribes/test/transport/SocketSend.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/transport/SocketSend.java (revision 590752)
++++ test/org/apache/catalina/tribes/test/transport/SocketSend.java (working copy)
+@@ -1,69 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.transport;
+-
+-import java.io.OutputStream;
+-import java.net.Socket;
+-import java.text.DecimalFormat;
+-import org.apache.catalina.tribes.membership.MemberImpl;
+-import org.apache.catalina.tribes.io.XByteBuffer;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.io.ChannelData;
+-import org.apache.catalina.tribes.Channel;
+-import java.math.BigDecimal;
+-
+-public class SocketSend {
+-
+- public static void main(String[] args) throws Exception {
+-
+-
+- Member mbr = new MemberImpl("localhost", 9999, 0);
+- ChannelData data = new ChannelData();
+- data.setOptions(Channel.SEND_OPTIONS_BYTE_MESSAGE);
+- data.setAddress(mbr);
+- byte[] buf = new byte[8192 * 4];
+- data.setMessage(new XByteBuffer(buf,false));
+- buf = XByteBuffer.createDataPackage(data);
+- int len = buf.length;
+- System.out.println("Message size:"+len+" bytes");
+- BigDecimal total = new BigDecimal((double)0);
+- BigDecimal bytes = new BigDecimal((double)len);
+- Socket socket = new Socket("localhost",9999);
+- System.out.println("Writing to 9999");
+- OutputStream out = socket.getOutputStream();
+- long start = 0;
+- double mb = 0;
+- boolean first = true;
+- int count = 0;
+- DecimalFormat df = new DecimalFormat("##.00");
+- while ( count<1000000 ) {
+- if ( first ) { first = false; start = System.currentTimeMillis();}
+- out.write(buf,0,buf.length);
+- mb += ( (double) buf.length) / 1024 / 1024;
+- total = total.add(bytes);
+- if ( ((++count) % 10000) == 0 ) {
+- long time = System.currentTimeMillis();
+- double seconds = ((double)(time-start))/1000;
+- System.out.println("Throughput "+df.format(mb/seconds)+"
MB/seconds messages "+count+", total "+mb+" MB, total
"+total+" bytes.");
+- }
+- }
+- out.flush();
+- System.out.println("Complete, sleeping 5 seconds");
+- Thread.sleep(5000);
+-
+- }
+-}
+Index: test/org/apache/catalina/tribes/test/transport/SocketTribesReceive.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/transport/SocketTribesReceive.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/transport/SocketTribesReceive.java (working
copy)
+@@ -1,87 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.transport;
+-
+-import java.net.ServerSocket;
+-import java.net.Socket;
+-import java.io.InputStream;
+-import java.text.DecimalFormat;
+-import java.math.BigDecimal;
+-import org.apache.catalina.tribes.io.XByteBuffer;
+-
+-public class SocketTribesReceive {
+- static long start = 0;
+- static double mb = 0;
+- //static byte[] buf = new byte[32871];
+- static byte[] buf = new byte[32871];
+- static boolean first = true;
+- static int count = 0;
+- static DecimalFormat df = new DecimalFormat("##.00");
+- static BigDecimal total = new BigDecimal((double)0);
+- static BigDecimal bytes = new BigDecimal((double)32871);
+-
+-
+- public static void main(String[] args) throws Exception {
+- int size = 43800;
+- if (args.length > 0 ) try {size=Integer.parseInt(args[0]);}catch(Exception
x){}
+- XByteBuffer xbuf = new XByteBuffer(43800,true);
+- ServerSocket srvSocket = new ServerSocket(9999);
+- System.out.println("Listening on 9999");
+- Socket socket = srvSocket.accept();
+- socket.setReceiveBufferSize(size);
+- InputStream in = socket.getInputStream();
+- Thread t = new Thread() {
+- public void run() {
+- while ( true ) {
+- try {
+- Thread.sleep(1000);
+- printStats(start, mb, count, df, total);
+- }catch ( Exception x ) {}
+- }
+- }
+- };
+- t.setDaemon(true);
+- t.start();
+-
+- while ( true ) {
+- if ( first ) { first = false; start = System.currentTimeMillis();}
+- int len = in.read(buf);
+- if ( len == -1 ) {
+- printStats(start, mb, count, df, total);
+- System.exit(1);
+- }
+- xbuf.append(buf,0,len);
+- if ( bytes.intValue() != len ) bytes = new BigDecimal((double)len);
+- total = total.add(bytes);
+- while ( xbuf.countPackages(true) > 0 ) {
+- xbuf.extractPackage(true);
+- count++;
+- }
+- mb += ( (double) len) / 1024 / 1024;
+- if ( ((count) % 10000) == 0 ) {
+- printStats(start, mb, count, df, total);
+- }
+- }
+-
+- }
+-
+- private static void printStats(long start, double mb, int count, DecimalFormat df,
BigDecimal total) {
+- long time = System.currentTimeMillis();
+- double seconds = ((double)(time-start))/1000;
+- System.out.println("Throughput "+df.format(mb/seconds)+"
MB/seconds messages "+count+", total "+mb+" MB, total
"+total+" bytes.");
+- }
+-}
+\ No newline at end of file
+Index: test/org/apache/catalina/tribes/test/transport/SocketValidateReceive.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/transport/SocketValidateReceive.java (revision
590752)
++++ test/org/apache/catalina/tribes/test/transport/SocketValidateReceive.java (working
copy)
+@@ -1,107 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.transport;
+-
+-import java.net.ServerSocket;
+-import java.net.Socket;
+-import java.io.InputStream;
+-import java.text.DecimalFormat;
+-import java.math.BigDecimal;
+-
+-public class SocketValidateReceive {
+- static long start = 0;
+- static double mb = 0;
+- static byte[] buf = new byte[8192 * 4];
+- static boolean first = true;
+- static int count = 0;
+- static DecimalFormat df = new DecimalFormat("##.00");
+- static BigDecimal total = new BigDecimal(0);
+- static BigDecimal bytes = new BigDecimal(32871);
+-
+-
+- public static void main(String[] args) throws Exception {
+- int size = 43800;
+- if (args.length > 0 ) try {size=Integer.parseInt(args[0]);}catch(Exception
x){}
+-
+- ServerSocket srvSocket = new ServerSocket(9999);
+- System.out.println("Listening on 9999");
+- Socket socket = srvSocket.accept();
+- socket.setReceiveBufferSize(size);
+- InputStream in = socket.getInputStream();
+- MyDataReader reader = new MyDataReader(50000);
+- Thread t = new Thread() {
+- public void run() {
+- while ( true ) {
+- try {
+- Thread.sleep(1000);
+- printStats(start, mb, count, df, total);
+- }catch ( Exception x ) {}
+- }
+- }
+- };
+- t.setDaemon(true);
+- t.start();
+-
+- while ( true ) {
+- if ( first ) { first = false; start = System.currentTimeMillis();}
+- int len = in.read(buf);
+- if ( len == -1 ) {
+- printStats(start, mb, count, df, total);
+- System.exit(1);
+- }
+- count += reader.append(buf,0,len);
+-
+- if ( bytes.intValue() != len ) bytes = new BigDecimal((double)len);
+- total = total.add(bytes);
+- mb += ( (double) len) / 1024 / 1024;
+- if ( ((count) % 10000) == 0 ) {
+- printStats(start, mb, count, df, total);
+- }
+- }
+-
+- }
+-
+- private static void printStats(long start, double mb, int count, DecimalFormat df,
BigDecimal total) {
+- long time = System.currentTimeMillis();
+- double seconds = ((double)(time-start))/1000;
+- System.out.println("Throughput "+df.format(mb/seconds)+"
MB/seconds messages "+count+", total "+mb+" MB, total
"+total+" bytes.");
+- }
+-
+- public static class MyDataReader {
+- byte[] data = new byte[43800];
+- int length = 10;
+- int cur = 0;
+- byte seq = 0;
+- public MyDataReader(int len) {
+- length = len;
+- }
+-
+- public int append(byte[] b, int off, int len) throws Exception {
+- int packages = 0;
+- for ( int i=off; i<len; i++ ) {
+- if ( cur == length ) {
+- cur = 0;
+- seq++;
+- packages++;
+- }
+- if ( b[i] != seq ) throw new Exception("mismatch on
seq:"+seq+" and byte nr:"+cur+" count:"+count+"
packages:"+packages);
+- cur++;
+- }
+- return packages;
+- }
+- }
+-}
+\ No newline at end of file
+Index: test/org/apache/catalina/tribes/test/transport/SocketNioSend.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/transport/SocketNioSend.java (revision 590752)
++++ test/org/apache/catalina/tribes/test/transport/SocketNioSend.java (working copy)
+@@ -1,107 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.transport;
+-
+-import java.io.OutputStream;
+-import java.net.Socket;
+-import java.text.DecimalFormat;
+-import org.apache.catalina.tribes.transport.nio.NioSender;
+-import org.apache.catalina.tribes.membership.MemberImpl;
+-import java.nio.channels.Selector;
+-import org.apache.catalina.tribes.io.XByteBuffer;
+-import org.apache.catalina.tribes.Member;
+-import java.nio.channels.SelectionKey;
+-import java.util.Iterator;
+-import org.apache.catalina.tribes.Channel;
+-import org.apache.catalina.tribes.io.ChannelData;
+-import java.math.BigDecimal;
+-
+-public class SocketNioSend {
+-
+- public static void main(String[] args) throws Exception {
+- Selector selector = Selector.open();
+- Member mbr = new MemberImpl("localhost", 9999, 0);
+- ChannelData data = new ChannelData();
+- data.setOptions(Channel.SEND_OPTIONS_BYTE_MESSAGE);
+- data.setAddress(mbr);
+- byte[] buf = new byte[8192 * 4];
+- data.setMessage(new XByteBuffer(buf,false));
+- buf = XByteBuffer.createDataPackage(data);
+- int len = buf.length;
+- BigDecimal total = new BigDecimal((double)0);
+- BigDecimal bytes = new BigDecimal((double)len);
+- NioSender sender = new NioSender();
+- sender.setDestination(mbr);
+- sender.setDirectBuffer(true);
+- sender.setSelector(selector);
+- sender.setTxBufSize(1024*1024);
+- sender.connect();
+- sender.setMessage(buf);
+- System.out.println("Writing to 9999");
+- long start = 0;
+- double mb = 0;
+- boolean first = true;
+- int count = 0;
+- DecimalFormat df = new DecimalFormat("##.00");
+- while (count<100000) {
+- if (first) {
+- first = false;
+- start = System.currentTimeMillis();
+- }
+- sender.setMessage(buf);
+- int selectedKeys = 0;
+- try {
+- selectedKeys = selector.select(0);
+- } catch (Exception e) {
+- e.printStackTrace();
+- continue;
+- }
+-
+- if (selectedKeys == 0) {
+- continue;
+- }
+-
+- Iterator it = selector.selectedKeys().iterator();
+- while (it.hasNext()) {
+- SelectionKey sk = (SelectionKey) it.next();
+- it.remove();
+- try {
+- int readyOps = sk.readyOps();
+- sk.interestOps(sk.interestOps() & ~readyOps);
+- if (sender.process(sk, false)) {
+- total = total.add(bytes);
+- sender.reset();
+- sender.setMessage(buf);
+- mb += ( (double) len) / 1024 / 1024;
+- if ( ( (++count) % 10000) == 0) {
+- long time = System.currentTimeMillis();
+- double seconds = ( (double) (time - start)) / 1000;
+- System.out.println("Throughput " + df.format(mb /
seconds) + " MB/seconds, total "+mb+" MB, total "+total+"
bytes.");
+- }
+- }
+-
+- } catch (Throwable t) {
+- t.printStackTrace();
+- return;
+- }
+- }
+- selector.selectedKeys().clear();
+- }
+- System.out.println("Complete, sleeping 15 seconds");
+- Thread.sleep(15000);
+- }
+-}
+Index: test/org/apache/catalina/tribes/test/transport/SocketReceive.java
+===================================================================
+--- test/org/apache/catalina/tribes/test/transport/SocketReceive.java (revision 590752)
++++ test/org/apache/catalina/tribes/test/transport/SocketReceive.java (working copy)
+@@ -1,78 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.test.transport;
+-
+-import java.net.ServerSocket;
+-import java.net.Socket;
+-import java.io.InputStream;
+-import java.text.DecimalFormat;
+-import java.math.BigDecimal;
+-
+-public class SocketReceive {
+- static long start = 0;
+- static double mb = 0;
+- static byte[] buf = new byte[8192 * 4];
+- static boolean first = true;
+- static int count = 0;
+- static DecimalFormat df = new DecimalFormat("##.00");
+- static BigDecimal total = new BigDecimal(0);
+- static BigDecimal bytes = new BigDecimal(32871);
+-
+-
+- public static void main(String[] args) throws Exception {
+-
+- ServerSocket srvSocket = new ServerSocket(9999);
+- System.out.println("Listening on 9999");
+- Socket socket = srvSocket.accept();
+- socket.setReceiveBufferSize(43800);
+- InputStream in = socket.getInputStream();
+- Thread t = new Thread() {
+- public void run() {
+- while ( true ) {
+- try {
+- Thread.sleep(1000);
+- printStats(start, mb, count, df, total);
+- }catch ( Exception x ) {}
+- }
+- }
+- };
+- t.setDaemon(true);
+- t.start();
+-
+- while ( true ) {
+- if ( first ) { first = false; start = System.currentTimeMillis();}
+- int len = in.read(buf);
+- if ( len == -1 ) {
+- printStats(start, mb, count, df, total);
+- System.exit(1);
+- }
+- if ( bytes.intValue() != len ) bytes = new BigDecimal((double)len);
+- total = total.add(bytes);
+- mb += ( (double) len) / 1024 / 1024;
+- if ( ((++count) % 10000) == 0 ) {
+- printStats(start, mb, count, df, total);
+- }
+- }
+-
+- }
+-
+- private static void printStats(long start, double mb, int count, DecimalFormat df,
BigDecimal total) {
+- long time = System.currentTimeMillis();
+- double seconds = ((double)(time-start))/1000;
+- System.out.println("Throughput "+df.format(mb/seconds)+"
MB/seconds messages "+count+", total "+mb+" MB, total
"+total+" bytes.");
+- }
+-}
+\ No newline at end of file
+Index: test/org/apache/catalina/tribes/demos/LoadTest.java
+===================================================================
+--- test/org/apache/catalina/tribes/demos/LoadTest.java (revision 590752)
++++ test/org/apache/catalina/tribes/demos/LoadTest.java (working copy)
+@@ -1,426 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.demos;
+-
+-import java.io.Serializable;
+-import java.util.Random;
+-
+-import org.apache.catalina.tribes.ByteMessage;
+-import org.apache.catalina.tribes.ChannelException;
+-import org.apache.catalina.tribes.ChannelListener;
+-import org.apache.catalina.tribes.ManagedChannel;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.MembershipListener;
+-import org.apache.catalina.tribes.io.XByteBuffer;
+-import org.apache.catalina.tribes.Channel;
+-import java.io.Externalizable;
+-import org.apache.juli.logging.Log;
+-import org.apache.juli.logging.LogFactory;
+-
+-
+-/**
+- * <p>Title: </p>
+- *
+- * <p>Description: </p>
+- *
+- * <p>Company: </p>
+- *
+- * @author not attributable
+- * @version 1.0
+- */
+-public class LoadTest implements MembershipListener,ChannelListener, Runnable {
+- protected static Log log = LogFactory.getLog(LoadTest.class);
+- public static int size = 24000;
+- public static Object mutex = new Object();
+- public boolean doRun = true;
+-
+- public long bytesReceived = 0;
+- public float mBytesReceived = 0;
+- public int messagesReceived = 0;
+- public boolean send = true;
+- public boolean debug = false;
+- public int msgCount = 100;
+- ManagedChannel channel=null;
+- public int statsInterval = 10000;
+- public long pause = 0;
+- public boolean breakonChannelException = false;
+- public boolean async = false;
+- public long receiveStart = 0;
+- public int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
+-
+- static int messageSize = 0;
+-
+- public static long messagesSent = 0;
+- public static long messageStartSendTime = 0;
+- public static long messageEndSendTime = 0;
+- public static int threadCount = 0;
+-
+- public static synchronized void startTest() {
+- threadCount++;
+- if ( messageStartSendTime == 0 ) messageStartSendTime =
System.currentTimeMillis();
+- }
+-
+- public static synchronized void endTest() {
+- threadCount--;
+- if ( messageEndSendTime == 0 && threadCount==0 ) messageEndSendTime =
System.currentTimeMillis();
+- }
+-
+-
+- public static synchronized long addSendStats(long count) {
+- messagesSent+=count;
+- return 0l;
+- }
+-
+- private static void printSendStats(long counter, int messageSize) {
+- float cnt = (float)counter;
+- float size = (float)messageSize;
+- float time = (float)(System.currentTimeMillis()-messageStartSendTime) / 1000f;
+- log.info("****SEND
STATS-"+Thread.currentThread().getName()+"*****"+
+- "\n\tMessage count:"+counter+
+- "\n\tTotal bytes :"+(long)(size*cnt)+
+- "\n\tTotal seconds:"+(time)+
+- "\n\tBytes/second :"+(size*cnt/time)+
+- "\n\tMBytes/second:"+(size*cnt/time/1024f/1024f));
+- }
+-
+-
+-
+- public LoadTest(ManagedChannel channel,
+- boolean send,
+- int msgCount,
+- boolean debug,
+- long pause,
+- int stats,
+- boolean breakOnEx) {
+- this.channel = channel;
+- this.send = send;
+- this.msgCount = msgCount;
+- this.debug = debug;
+- this.pause = pause;
+- this.statsInterval = stats;
+- this.breakonChannelException = breakOnEx;
+- }
+-
+-
+-
+- public void run() {
+-
+- long counter = 0;
+- long total = 0;
+- LoadMessage msg = new LoadMessage();
+- int messageSize = LoadTest.messageSize;
+-
+- try {
+- startTest();
+- while (total < msgCount) {
+- if (channel.getMembers().length == 0 || (!send)) {
+- synchronized (mutex) {
+- try {
+- mutex.wait();
+- } catch (InterruptedException x) {
+- log.info("Thread interrupted from wait");
+- }
+- }
+- } else {
+- try {
+- //msg.setMsgNr((int)++total);
+- counter++;
+- if (debug) {
+- printArray(msg.getMessage());
+- }
+- channel.send(channel.getMembers(), msg, channelOptions);
+- if ( pause > 0 ) {
+- if ( debug) System.out.println("Pausing sender for
"+pause+" ms.");
+- Thread.sleep(pause);
+- }
+- } catch (ChannelException x) {
+- if ( debug ) log.error("Unable to send
message:"+x.getMessage(),x);
+- log.error("Unable to send message:"+x.getMessage());
+- ChannelException.FaultyMember[] faulty = x.getFaultyMembers();
+- for (int i=0; i<faulty.length; i++ ) log.error("Faulty:
"+faulty[i]);
+- --counter;
+- if ( this.breakonChannelException ) throw x;
+- }
+- }
+- if ( (counter % statsInterval) == 0 && (counter > 0)) {
+- //add to the global counter
+- counter = addSendStats(counter);
+- //print from the global counter
+- //printSendStats(LoadTest.messagesSent, LoadTest.messageSize,
LoadTest.messageSendTime);
+- printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
+-
+- }
+-
+- }
+- }catch ( Exception x ) {
+- log.error("Captured error while sending:"+x.getMessage());
+- if ( debug ) log.error("",x);
+- printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
+- }
+- endTest();
+- }
+-
+-
+-
+- /**
+- * memberAdded
+- *
+- * @param member Member
+- * @todo Implement this org.apache.catalina.tribes.MembershipListener
+- * method
+- */
+- public void memberAdded(Member member) {
+- log.info("Member added:"+member);
+- synchronized (mutex) {
+- mutex.notifyAll();
+- }
+- }
+-
+- /**
+- * memberDisappeared
+- *
+- * @param member Member
+- * @todo Implement this org.apache.catalina.tribes.MembershipListener
+- * method
+- */
+- public void memberDisappeared(Member member) {
+- log.info("Member disappeared:"+member);
+- }
+-
+- public boolean accept(Serializable msg, Member mbr){
+- return (msg instanceof LoadMessage) || (msg instanceof ByteMessage);
+- }
+-
+- public void messageReceived(Serializable msg, Member mbr){
+- if ( receiveStart == 0 ) receiveStart = System.currentTimeMillis();
+- if ( debug ) {
+- if ( msg instanceof LoadMessage ) {
+- printArray(((LoadMessage)msg).getMessage());
+- }
+- }
+-
+- if ( msg instanceof ByteMessage && !(msg instanceof LoadMessage)) {
+- LoadMessage tmp = new LoadMessage();
+- tmp.setMessage(((ByteMessage)msg).getMessage());
+- msg = tmp;
+- tmp = null;
+- }
+-
+-
+- bytesReceived+=((LoadMessage)msg).getMessage().length;
+- mBytesReceived+=((float)((LoadMessage)msg).getMessage().length)/1024f/1024f;
+- messagesReceived++;
+- if ( (messagesReceived%statsInterval)==0 || (messagesReceived==msgCount)) {
+- float bytes =
(float)(((LoadMessage)msg).getMessage().length*messagesReceived);
+- float seconds = ((float)(System.currentTimeMillis()-receiveStart)) / 1000f;
+- log.info("****RECEIVE
STATS-"+Thread.currentThread().getName()+"*****"+
+- "\n\tMessage count :"+(long)messagesReceived+
+- "\n\tMessage/sec :"+messagesReceived/seconds+
+- "\n\tTotal bytes :"+(long)bytes+
+- "\n\tTotal mbytes :"+(long)mBytesReceived+
+- "\n\tTime since 1st:"+seconds+" seconds"+
+- "\n\tBytes/second :"+(bytes/seconds)+
+- "\n\tMBytes/second
:"+(mBytesReceived/seconds)+"\n");
+-
+- }
+- }
+-
+-
+- public static void printArray(byte[] data) {
+- System.out.print("{");
+- for (int i=0; i<data.length; i++ ) {
+- System.out.print(data[i]);
+- System.out.print(",");
+- }
+- System.out.println("} size:"+data.length);
+- }
+-
+-
+-
+- //public static class LoadMessage implements Serializable {
+- public static class LoadMessage extends ByteMessage implements Serializable {
+-
+- public static byte[] outdata = new byte[size];
+- public static Random r = new Random(System.currentTimeMillis());
+- public static int getMessageSize (LoadMessage msg) {
+- int messageSize = msg.getMessage().length;
+- if ( ((Object)msg) instanceof ByteMessage ) return messageSize;
+- try {
+- messageSize = XByteBuffer.serialize(new LoadMessage()).length;
+- log.info("Average message size:" + messageSize + "
bytes");
+- } catch (Exception x) {
+- log.error("Unable to calculate test message size.", x);
+- }
+- return messageSize;
+- }
+- static {
+- r.nextBytes(outdata);
+- }
+-
+- protected byte[] message = getMessage();
+-
+- public LoadMessage() {
+- }
+-
+- public byte[] getMessage() {
+- if ( message == null ) {
+- message = outdata;
+- }
+- return message;
+- }
+-
+- public void setMessage(byte[] data) {
+- this.message = data;
+- }
+- }
+-
+- public static void usage() {
+- System.out.println("Tribes Load tester.");
+- System.out.println("The load tester can be used in sender or received mode
or both");
+- System.out.println("Usage:\n\t"+
+- "java LoadTest [options]\n\t"+
+- "Options:\n\t\t"+
+- "[-mode receive|send|both] \n\t\t"+
+- "[-startoptions startflags (default is Channel.DEFAULT)
] \n\t\t"+
+- "[-debug] \n\t\t"+
+- "[-count messagecount] \n\t\t"+
+- "[-stats statinterval] \n\t\t"+
+- "[-pause nrofsecondstopausebetweensends] \n\t\t"+
+- "[-threads numberofsenderthreads] \n\t\t"+
+- "[-size messagesize] \n\t\t"+
+- "[-sendoptions channeloptions] \n\t\t"+
+- "[-break (halts execution on exception)]\n"+
+- "[-shutdown (issues a channel.stop() command after send
is completed)]\n"+
+- "\tChannel options:"+
+- ChannelCreator.usage()+"\n\n"+
+- "Example:\n\t"+
+- "java LoadTest -port 4004\n\t"+
+- "java LoadTest -bind 192.168.0.45 -port 4005\n\t"+
+- "java LoadTest -bind 192.168.0.45 -port 4005 -mbind
192.168.0.45 -count 100 -stats 10\n");
+- }
+-
+- public static void main(String[] args) throws Exception {
+- boolean send = true;
+- boolean debug = false;
+- long pause = 0;
+- int count = 1000000;
+- int stats = 10000;
+- boolean breakOnEx = false;
+- int threads = 1;
+- boolean shutdown = false;
+- int startoptions = Channel.DEFAULT;
+- int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
+- if ( args.length == 0 ) {
+- args = new String[] {"-help"};
+- }
+- for (int i = 0; i < args.length; i++) {
+- if ("-threads".equals(args[i])) {
+- threads = Integer.parseInt(args[++i]);
+- } else if ("-count".equals(args[i])) {
+- count = Integer.parseInt(args[++i]);
+- System.out.println("Sending "+count+" messages.");
+- } else if ("-pause".equals(args[i])) {
+- pause = Long.parseLong(args[++i])*1000;
+- } else if ("-break".equals(args[i])) {
+- breakOnEx = true;
+- } else if ("-shutdown".equals(args[i])) {
+- shutdown = true;
+- } else if ("-stats".equals(args[i])) {
+- stats = Integer.parseInt(args[++i]);
+- System.out.println("Stats every "+stats+"
message");
+- } else if ("-sendoptions".equals(args[i])) {
+- channelOptions = Integer.parseInt(args[++i]);
+- System.out.println("Setting send options to
"+channelOptions);
+- } else if ("-startoptions".equals(args[i])) {
+- startoptions = Integer.parseInt(args[++i]);
+- System.out.println("Setting start options to "+startoptions);
+- } else if ("-size".equals(args[i])) {
+- size = Integer.parseInt(args[++i])-4;
+- System.out.println("Message size will be:"+(size+4)+"
bytes");
+- } else if ("-mode".equals(args[i])) {
+- if ( "receive".equals(args[++i]) ) send = false;
+- } else if ("-debug".equals(args[i])) {
+- debug = true;
+- } else if ("-help".equals(args[i]))
+- {
+- usage();
+- System.exit(1);
+- }
+- }
+-
+- ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args);
+-
+- LoadTest test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
+- test.channelOptions = channelOptions;
+- LoadMessage msg = new LoadMessage();
+-
+- messageSize = LoadMessage.getMessageSize(msg);
+- channel.addChannelListener(test);
+- channel.addMembershipListener(test);
+- channel.start(startoptions);
+- Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
+- while ( threads > 1 ) {
+- Thread t = new Thread(test);
+- t.setDaemon(true);
+- t.start();
+- threads--;
+- test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
+- test.channelOptions = channelOptions;
+- }
+- test.run();
+- if ( shutdown && send ) channel.stop(channel.DEFAULT);
+- System.out.println("System test complete, sleeping to let threads
finish.");
+- Thread.sleep(60*1000*60);
+- }
+-
+- public static class Shutdown extends Thread {
+- ManagedChannel channel = null;
+- public Shutdown(ManagedChannel channel) {
+- this.channel = channel;
+- }
+-
+- public void run() {
+- System.out.println("Shutting down...");
+- SystemExit exit = new SystemExit(5000);
+- exit.setDaemon(true);
+- exit.start();
+- try {
+- channel.stop(channel.DEFAULT);
+-
+- }catch ( Exception x ) {
+- x.printStackTrace();
+- }
+- System.out.println("Channel stopped.");
+- }
+- }
+- public static class SystemExit extends Thread {
+- private long delay;
+- public SystemExit(long delay) {
+- this.delay = delay;
+- }
+- public void run () {
+- try {
+- Thread.sleep(delay);
+- }catch ( Exception x ) {
+- x.printStackTrace();
+- }
+- System.exit(0);
+-
+- }
+- }
+-
+-}
+\ No newline at end of file
+Index: test/org/apache/catalina/tribes/demos/IntrospectionUtils.java
+===================================================================
+--- test/org/apache/catalina/tribes/demos/IntrospectionUtils.java (revision 590752)
++++ test/org/apache/catalina/tribes/demos/IntrospectionUtils.java (working copy)
+@@ -1,1004 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-
+-package org.apache.catalina.tribes.demos;
+-
+-import java.io.File;
+-import java.io.FilenameFilter;
+-import java.io.IOException;
+-import java.lang.reflect.InvocationTargetException;
+-import java.lang.reflect.Method;
+-import java.net.InetAddress;
+-import java.net.MalformedURLException;
+-import java.net.URL;
+-import java.net.UnknownHostException;
+-import java.util.Hashtable;
+-import java.util.StringTokenizer;
+-import java.util.Vector;
+-import org.apache.juli.logging.LogFactory;
+-import org.apache.juli.logging.Log;
+-
+-// Depends: JDK1.1
+-
+-/**
+- * Utils for introspection and reflection
+- */
+-public final class IntrospectionUtils {
+-
+-
+- private static Log log= LogFactory.getLog( IntrospectionUtils.class );
+-
+- /**
+- * Call execute() - any ant-like task should work
+- */
+- public static void execute(Object proxy, String method) throws Exception {
+- Method executeM = null;
+- Class c = proxy.getClass();
+- Class params[] = new Class[0];
+- // params[0]=args.getClass();
+- executeM = findMethod(c, method, params);
+- if (executeM == null) {
+- throw new RuntimeException("No execute in " + proxy.getClass());
+- }
+- executeM.invoke(proxy, (Object[]) null);//new Object[] { args });
+- }
+-
+- /**
+- * Call void setAttribute( String ,Object )
+- */
+- public static void setAttribute(Object proxy, String n, Object v)
+- throws Exception {
+- if (proxy instanceof AttributeHolder) {
+- ((AttributeHolder) proxy).setAttribute(n, v);
+- return;
+- }
+-
+- Method executeM = null;
+- Class c = proxy.getClass();
+- Class params[] = new Class[2];
+- params[0] = String.class;
+- params[1] = Object.class;
+- executeM = findMethod(c, "setAttribute", params);
+- if (executeM == null) {
+- if (log.isDebugEnabled())
+- log.debug("No setAttribute in " + proxy.getClass());
+- return;
+- }
+- if (false)
+- if (log.isDebugEnabled())
+- log.debug("Setting " + n + "=" + v + " in
" + proxy);
+- executeM.invoke(proxy, new Object[] { n, v });
+- return;
+- }
+-
+- /**
+- * Call void getAttribute( String )
+- */
+- public static Object getAttribute(Object proxy, String n) throws Exception {
+- Method executeM = null;
+- Class c = proxy.getClass();
+- Class params[] = new Class[1];
+- params[0] = String.class;
+- executeM = findMethod(c, "getAttribute", params);
+- if (executeM == null) {
+- if (log.isDebugEnabled())
+- log.debug("No getAttribute in " + proxy.getClass());
+- return null;
+- }
+- return executeM.invoke(proxy, new Object[] { n });
+- }
+-
+- /**
+- * Construct a URLClassLoader. Will compile and work in JDK1.1 too.
+- */
+- public static ClassLoader getURLClassLoader(URL urls[], ClassLoader parent) {
+- try {
+- Class urlCL = Class.forName("java.net.URLClassLoader");
+- Class paramT[] = new Class[2];
+- paramT[0] = urls.getClass();
+- paramT[1] = ClassLoader.class;
+- Method m = findMethod(urlCL, "newInstance", paramT);
+- if (m == null)
+- return null;
+-
+- ClassLoader cl = (ClassLoader) m.invoke(urlCL, new Object[] { urls,
+- parent });
+- return cl;
+- } catch (ClassNotFoundException ex) {
+- // jdk1.1
+- return null;
+- } catch (Exception ex) {
+- ex.printStackTrace();
+- return null;
+- }
+- }
+-
+- public static String guessInstall(String installSysProp,
+- String homeSysProp, String jarName) {
+- return guessInstall(installSysProp, homeSysProp, jarName, null);
+- }
+-
+- /**
+- * Guess a product install/home by analyzing the class path. It works for
+- * product using the pattern: lib/executable.jar or if executable.jar is
+- * included in classpath by a shell script. ( java -jar also works )
+- *
+- * Insures both "install" and "home" System properties are set.
If either or
+- * both System properties are unset, "install" and "home" will
be set to the
+- * same value. This value will be the other System property that is set, or
+- * the guessed value if neither is set.
+- */
+- public static String guessInstall(String installSysProp,
+- String homeSysProp, String jarName, String classFile) {
+- String install = null;
+- String home = null;
+-
+- if (installSysProp != null)
+- install = System.getProperty(installSysProp);
+-
+- if (homeSysProp != null)
+- home = System.getProperty(homeSysProp);
+-
+- if (install != null) {
+- if (home == null)
+- System.getProperties().put(homeSysProp, install);
+- return install;
+- }
+-
+- // Find the directory where jarName.jar is located
+-
+- String cpath = System.getProperty("java.class.path");
+- String pathSep = System.getProperty("path.separator");
+- StringTokenizer st = new StringTokenizer(cpath, pathSep);
+- while (st.hasMoreTokens()) {
+- String path = st.nextToken();
+- // log( "path " + path );
+- if (path.endsWith(jarName)) {
+- home = path.substring(0, path.length() - jarName.length());
+- try {
+- if ("".equals(home)) {
+- home = new File("./").getCanonicalPath();
+- } else if (home.endsWith(File.separator)) {
+- home = home.substring(0, home.length() - 1);
+- }
+- File f = new File(home);
+- String parentDir = f.getParent();
+- if (parentDir == null)
+- parentDir = home; // unix style
+- File f1 = new File(parentDir);
+- install = f1.getCanonicalPath();
+- if (installSysProp != null)
+- System.getProperties().put(installSysProp, install);
+- if (home == null && homeSysProp != null)
+- System.getProperties().put(homeSysProp, install);
+- return install;
+- } catch (Exception ex) {
+- ex.printStackTrace();
+- }
+- } else {
+- String fname = path + (path.endsWith("/") ? "" :
"/")
+- + classFile;
+- if (new File(fname).exists()) {
+- try {
+- File f = new File(path);
+- String parentDir = f.getParent();
+- if (parentDir == null)
+- parentDir = path; // unix style
+- File f1 = new File(parentDir);
+- install = f1.getCanonicalPath();
+- if (installSysProp != null)
+- System.getProperties().put(installSysProp, install);
+- if (home == null && homeSysProp != null)
+- System.getProperties().put(homeSysProp, install);
+- return install;
+- } catch (Exception ex) {
+- ex.printStackTrace();
+- }
+- }
+- }
+- }
+-
+- // if install directory can't be found, use home as the default
+- if (home != null) {
+- System.getProperties().put(installSysProp, home);
+- return home;
+- }
+-
+- return null;
+- }
+-
+- /**
+- * Debug method, display the classpath
+- */
+- public static void displayClassPath(String msg, URL[] cp) {
+- if (log.isDebugEnabled()) {
+- log.debug(msg);
+- for (int i = 0; i < cp.length; i++) {
+- log.debug(cp[i].getFile());
+- }
+- }
+- }
+-
+- public static String PATH_SEPARATOR =
System.getProperty("path.separator");
+-
+- /**
+- * Adds classpath entries from a vector of URL's to the "tc_path_add"
System
+- * property. This System property lists the classpath entries common to web
+- * applications. This System property is currently used by Jasper when its
+- * JSP servlet compiles the Java file for a JSP.
+- */
+- public static String classPathAdd(URL urls[], String cp) {
+- if (urls == null)
+- return cp;
+-
+- for (int i = 0; i < urls.length; i++) {
+- if (cp != null)
+- cp += PATH_SEPARATOR + urls[i].getFile();
+- else
+- cp = urls[i].getFile();
+- }
+- return cp;
+- }
+-
+- /**
+- * Find a method with the right name If found, call the method ( if param is
+- * int or boolean we'll convert value to the right type before) - that means
+- * you can have setDebug(1).
+- */
+- public static void setProperty(Object o, String name, String value) {
+- if (dbg > 1)
+- d("setProperty(" + o.getClass() + " " + name +
"=" + value + ")");
+-
+- String setter = "set" + capitalize(name);
+-
+- try {
+- Method methods[] = findMethods(o.getClass());
+- Method setPropertyMethod = null;
+-
+- // First, the ideal case - a setFoo( String ) method
+- for (int i = 0; i < methods.length; i++) {
+- Class paramT[] = methods[i].getParameterTypes();
+- if (setter.equals(methods[i].getName()) && paramT.length == 1
+- &&
"java.lang.String".equals(paramT[0].getName())) {
+-
+- methods[i].invoke(o, new Object[] { value });
+- return;
+- }
+- }
+-
+- // Try a setFoo ( int ) or ( boolean )
+- for (int i = 0; i < methods.length; i++) {
+- boolean ok = true;
+- if (setter.equals(methods[i].getName())
+- && methods[i].getParameterTypes().length == 1) {
+-
+- // match - find the type and invoke it
+- Class paramType = methods[i].getParameterTypes()[0];
+- Object params[] = new Object[1];
+-
+- // Try a setFoo ( int )
+- if ("java.lang.Integer".equals(paramType.getName())
+- || "int".equals(paramType.getName())) {
+- try {
+- params[0] = new Integer(value);
+- } catch (NumberFormatException ex) {
+- ok = false;
+- }
+- // Try a setFoo ( long )
+- }else if ("java.lang.Long".equals(paramType.getName())
+- || "long".equals(paramType.getName())) {
+- try {
+- params[0] = new Long(value);
+- } catch (NumberFormatException ex) {
+- ok = false;
+- }
+-
+- // Try a setFoo ( boolean )
+- } else if
("java.lang.Boolean".equals(paramType.getName())
+- || "boolean".equals(paramType.getName())) {
+- params[0] = new Boolean(value);
+-
+- // Try a setFoo ( InetAddress )
+- } else if ("java.net.InetAddress".equals(paramType
+- .getName())) {
+- try {
+- params[0] = InetAddress.getByName(value);
+- } catch (UnknownHostException exc) {
+- d("Unable to resolve host name:" + value);
+- ok = false;
+- }
+-
+- // Unknown type
+- } else {
+- d("Unknown type " + paramType.getName());
+- }
+-
+- if (ok) {
+- methods[i].invoke(o, params);
+- return;
+- }
+- }
+-
+- // save "setProperty" for later
+- if ("setProperty".equals(methods[i].getName())) {
+- setPropertyMethod = methods[i];
+- }
+- }
+-
+- // Ok, no setXXX found, try a setProperty("name",
"value")
+- if (setPropertyMethod != null) {
+- Object params[] = new Object[2];
+- params[0] = name;
+- params[1] = value;
+- setPropertyMethod.invoke(o, params);
+- }
+-
+- } catch (IllegalArgumentException ex2) {
+- log.warn("IAE " + o + " " + name + " " +
value, ex2);
+- } catch (SecurityException ex1) {
+- if (dbg > 0)
+- d("SecurityException for " + o.getClass() + " " +
name + "="
+- + value + ")");
+- if (dbg > 1)
+- ex1.printStackTrace();
+- } catch (IllegalAccessException iae) {
+- if (dbg > 0)
+- d("IllegalAccessException for " + o.getClass() + " "
+ name
+- + "=" + value + ")");
+- if (dbg > 1)
+- iae.printStackTrace();
+- } catch (InvocationTargetException ie) {
+- if (dbg > 0)
+- d("InvocationTargetException for " + o.getClass() + "
" + name
+- + "=" + value + ")");
+- if (dbg > 1)
+- ie.printStackTrace();
+- }
+- }
+-
+- public static Object getProperty(Object o, String name) {
+- String getter = "get" + capitalize(name);
+- String isGetter = "is" + capitalize(name);
+-
+- try {
+- Method methods[] = findMethods(o.getClass());
+- Method getPropertyMethod = null;
+-
+- // First, the ideal case - a getFoo() method
+- for (int i = 0; i < methods.length; i++) {
+- Class paramT[] = methods[i].getParameterTypes();
+- if (getter.equals(methods[i].getName()) && paramT.length == 0)
{
+- return methods[i].invoke(o, (Object[]) null);
+- }
+- if (isGetter.equals(methods[i].getName()) && paramT.length == 0)
{
+- return methods[i].invoke(o, (Object[]) null);
+- }
+-
+- if ("getProperty".equals(methods[i].getName())) {
+- getPropertyMethod = methods[i];
+- }
+- }
+-
+- // Ok, no setXXX found, try a getProperty("name")
+- if (getPropertyMethod != null) {
+- Object params[] = new Object[1];
+- params[0] = name;
+- return getPropertyMethod.invoke(o, params);
+- }
+-
+- } catch (IllegalArgumentException ex2) {
+- log.warn("IAE " + o + " " + name, ex2);
+- } catch (SecurityException ex1) {
+- if (dbg > 0)
+- d("SecurityException for " + o.getClass() + " " +
name + ")");
+- if (dbg > 1)
+- ex1.printStackTrace();
+- } catch (IllegalAccessException iae) {
+- if (dbg > 0)
+- d("IllegalAccessException for " + o.getClass() + " "
+ name
+- + ")");
+- if (dbg > 1)
+- iae.printStackTrace();
+- } catch (InvocationTargetException ie) {
+- if (dbg > 0)
+- d("InvocationTargetException for " + o.getClass() + "
" + name
+- + ")");
+- if (dbg > 1)
+- ie.printStackTrace();
+- }
+- return null;
+- }
+-
+- /**
+- */
+- public static void setProperty(Object o, String name) {
+- String setter = "set" + capitalize(name);
+- try {
+- Method methods[] = findMethods(o.getClass());
+- Method setPropertyMethod = null;
+- // find setFoo() method
+- for (int i = 0; i < methods.length; i++) {
+- Class paramT[] = methods[i].getParameterTypes();
+- if (setter.equals(methods[i].getName()) && paramT.length == 0)
{
+- methods[i].invoke(o, new Object[] {});
+- return;
+- }
+- }
+- } catch (Exception ex1) {
+- if (dbg > 0)
+- d("Exception for " + o.getClass() + " " + name);
+- if (dbg > 1)
+- ex1.printStackTrace();
+- }
+- }
+-
+- /**
+- * Replace ${NAME} with the property value
+- *
+- * @deprecated Use the explicit method
+- */
+- public static String replaceProperties(String value, Object getter) {
+- if (getter instanceof Hashtable)
+- return replaceProperties(value, (Hashtable) getter, null);
+-
+- if (getter instanceof PropertySource) {
+- PropertySource src[] = new PropertySource[] { (PropertySource) getter };
+- return replaceProperties(value, null, src);
+- }
+- return value;
+- }
+-
+- /**
+- * Replace ${NAME} with the property value
+- */
+- public static String replaceProperties(String value, Hashtable staticProp,
+- PropertySource dynamicProp[]) {
+- StringBuffer sb = new StringBuffer();
+- int prev = 0;
+- // assert value!=nil
+- int pos;
+- while ((pos = value.indexOf("$", prev)) >= 0) {
+- if (pos > 0) {
+- sb.append(value.substring(prev, pos));
+- }
+- if (pos == (value.length() - 1)) {
+- sb.append('$');
+- prev = pos + 1;
+- } else if (value.charAt(pos + 1) != '{') {
+- sb.append('$');
+- prev = pos + 1; // XXX
+- } else {
+- int endName = value.indexOf('}', pos);
+- if (endName < 0) {
+- sb.append(value.substring(pos));
+- prev = value.length();
+- continue;
+- }
+- String n = value.substring(pos + 2, endName);
+- String v = null;
+- if (staticProp != null) {
+- v = (String) ((Hashtable) staticProp).get(n);
+- }
+- if (v == null && dynamicProp != null) {
+- for (int i = 0; i < dynamicProp.length; i++) {
+- v = dynamicProp[i].getProperty(n);
+- if (v != null) {
+- break;
+- }
+- }
+- }
+- if (v == null)
+- v = "${" + n + "}";
+-
+- sb.append(v);
+- prev = endName + 1;
+- }
+- }
+- if (prev < value.length())
+- sb.append(value.substring(prev));
+- return sb.toString();
+- }
+-
+- /**
+- * Reverse of Introspector.decapitalize
+- */
+- public static String capitalize(String name) {
+- if (name == null || name.length() == 0) {
+- return name;
+- }
+- char chars[] = name.toCharArray();
+- chars[0] = Character.toUpperCase(chars[0]);
+- return new String(chars);
+- }
+-
+- public static String unCapitalize(String name) {
+- if (name == null || name.length() == 0) {
+- return name;
+- }
+- char chars[] = name.toCharArray();
+- chars[0] = Character.toLowerCase(chars[0]);
+- return new String(chars);
+- }
+-
+- // -------------------- Class path tools --------------------
+-
+- /**
+- * Add all the jar files in a dir to the classpath, represented as a Vector
+- * of URLs.
+- */
+- public static void addToClassPath(Vector cpV, String dir) {
+- try {
+- String cpComp[] = getFilesByExt(dir, ".jar");
+- if (cpComp != null) {
+- int jarCount = cpComp.length;
+- for (int i = 0; i < jarCount; i++) {
+- URL url = getURL(dir, cpComp[i]);
+- if (url != null)
+- cpV.addElement(url);
+- }
+- }
+- } catch (Exception ex) {
+- ex.printStackTrace();
+- }
+- }
+-
+- public static void addToolsJar(Vector v) {
+- try {
+- // Add tools.jar in any case
+- File f = new File(System.getProperty("java.home")
+- + "/../lib/tools.jar");
+-
+- if (!f.exists()) {
+- // On some systems java.home gets set to the root of jdk.
+- // That's a bug, but we can work around and be nice.
+- f = new File(System.getProperty("java.home") +
"/lib/tools.jar");
+- if (f.exists()) {
+- if (log.isDebugEnabled())
+- log.debug("Detected strange java.home value "
+- + System.getProperty("java.home")
+- + ", it should point to jre");
+- }
+- }
+- URL url = new URL("file", "", f.getAbsolutePath());
+-
+- v.addElement(url);
+- } catch (MalformedURLException ex) {
+- ex.printStackTrace();
+- }
+- }
+-
+- /**
+- * Return all files with a given extension in a dir
+- */
+- public static String[] getFilesByExt(String ld, String ext) {
+- File dir = new File(ld);
+- String[] names = null;
+- final String lext = ext;
+- if (dir.isDirectory()) {
+- names = dir.list(new FilenameFilter() {
+- public boolean accept(File d, String name) {
+- if (name.endsWith(lext)) {
+- return true;
+- }
+- return false;
+- }
+- });
+- }
+- return names;
+- }
+-
+- /**
+- * Construct a file url from a file, using a base dir
+- */
+- public static URL getURL(String base, String file) {
+- try {
+- File baseF = new File(base);
+- File f = new File(baseF, file);
+- String path = f.getCanonicalPath();
+- if (f.isDirectory()) {
+- path += "/";
+- }
+- if (!f.exists())
+- return null;
+- return new URL("file", "", path);
+- } catch (Exception ex) {
+- ex.printStackTrace();
+- return null;
+- }
+- }
+-
+- /**
+- * Add elements from the classpath <i>cp </i> to a Vector <i>jars
</i> as
+- * file URLs (We use Vector for JDK 1.1 compat).
+- * <p>
+- *
+- * @param jars The jar list
+- * @param cp a String classpath of directory or jar file elements
+- * separated by path.separator delimiters.
+- * @throws IOException If an I/O error occurs
+- * @throws MalformedURLException Doh ;)
+- */
+- public static void addJarsFromClassPath(Vector jars, String cp)
+- throws IOException, MalformedURLException {
+- String sep = System.getProperty("path.separator");
+- String token;
+- StringTokenizer st;
+- if (cp != null) {
+- st = new StringTokenizer(cp, sep);
+- while (st.hasMoreTokens()) {
+- File f = new File(st.nextToken());
+- String path = f.getCanonicalPath();
+- if (f.isDirectory()) {
+- path += "/";
+- }
+- URL url = new URL("file", "", path);
+- if (!jars.contains(url)) {
+- jars.addElement(url);
+- }
+- }
+- }
+- }
+-
+- /**
+- * Return a URL[] that can be used to construct a class loader
+- */
+- public static URL[] getClassPath(Vector v) {
+- URL[] urls = new URL[v.size()];
+- for (int i = 0; i < v.size(); i++) {
+- urls[i] = (URL) v.elementAt(i);
+- }
+- return urls;
+- }
+-
+- /**
+- * Construct a URL classpath from files in a directory, a cpath property,
+- * and tools.jar.
+- */
+- public static URL[] getClassPath(String dir, String cpath,
+- String cpathProp, boolean addTools) throws IOException,
+- MalformedURLException {
+- Vector jarsV = new Vector();
+- if (dir != null) {
+- // Add dir/classes first, if it exists
+- URL url = getURL(dir, "classes");
+- if (url != null)
+- jarsV.addElement(url);
+- addToClassPath(jarsV, dir);
+- }
+-
+- if (cpath != null)
+- addJarsFromClassPath(jarsV, cpath);
+-
+- if (cpathProp != null) {
+- String cpath1 = System.getProperty(cpathProp);
+- addJarsFromClassPath(jarsV, cpath1);
+- }
+-
+- if (addTools)
+- addToolsJar(jarsV);
+-
+- return getClassPath(jarsV);
+- }
+-
+- // -------------------- Mapping command line params to setters
+-
+- public static boolean processArgs(Object proxy, String args[])
+- throws Exception {
+- String args0[] = null;
+- if (null != findMethod(proxy.getClass(), "getOptions1", new Class[]
{})) {
+- args0 = (String[]) callMethod0(proxy, "getOptions1");
+- }
+-
+- if (args0 == null) {
+- //args0=findVoidSetters(proxy.getClass());
+- args0 = findBooleanSetters(proxy.getClass());
+- }
+- Hashtable h = null;
+- if (null != findMethod(proxy.getClass(), "getOptionAliases",
+- new Class[] {})) {
+- h = (Hashtable) callMethod0(proxy, "getOptionAliases");
+- }
+- return processArgs(proxy, args, args0, null, h);
+- }
+-
+- public static boolean processArgs(Object proxy, String args[],
+- String args0[], String args1[], Hashtable aliases) throws Exception {
+- for (int i = 0; i < args.length; i++) {
+- String arg = args[i];
+- if (arg.startsWith("-"))
+- arg = arg.substring(1);
+- if (aliases != null && aliases.get(arg) != null)
+- arg = (String) aliases.get(arg);
+-
+- if (args0 != null) {
+- boolean set = false;
+- for (int j = 0; j < args0.length; j++) {
+- if (args0[j].equalsIgnoreCase(arg)) {
+- setProperty(proxy, args0[j], "true");
+- set = true;
+- break;
+- }
+- }
+- if (set)
+- continue;
+- }
+- if (args1 != null) {
+- for (int j = 0; j < args1.length; j++) {
+- if (args1[j].equalsIgnoreCase(arg)) {
+- i++;
+- if (i >= args.length)
+- return false;
+- setProperty(proxy, arg, args[i]);
+- break;
+- }
+- }
+- } else {
+- // if args1 is not specified,assume all other options have param
+- i++;
+- if (i >= args.length)
+- return false;
+- setProperty(proxy, arg, args[i]);
+- }
+-
+- }
+- return true;
+- }
+-
+- // -------------------- other utils --------------------
+- public static void clear() {
+- objectMethods.clear();
+- }
+-
+- public static String[] findVoidSetters(Class c) {
+- Method m[] = findMethods(c);
+- if (m == null)
+- return null;
+- Vector v = new Vector();
+- for (int i = 0; i < m.length; i++) {
+- if (m[i].getName().startsWith("set")
+- && m[i].getParameterTypes().length == 0) {
+- String arg = m[i].getName().substring(3);
+- v.addElement(unCapitalize(arg));
+- }
+- }
+- String s[] = new String[v.size()];
+- for (int i = 0; i < s.length; i++) {
+- s[i] = (String) v.elementAt(i);
+- }
+- return s;
+- }
+-
+- public static String[] findBooleanSetters(Class c) {
+- Method m[] = findMethods(c);
+- if (m == null)
+- return null;
+- Vector v = new Vector();
+- for (int i = 0; i < m.length; i++) {
+- if (m[i].getName().startsWith("set")
+- && m[i].getParameterTypes().length == 1
+- &&
"boolean".equalsIgnoreCase(m[i].getParameterTypes()[0]
+- .getName())) {
+- String arg = m[i].getName().substring(3);
+- v.addElement(unCapitalize(arg));
+- }
+- }
+- String s[] = new String[v.size()];
+- for (int i = 0; i < s.length; i++) {
+- s[i] = (String) v.elementAt(i);
+- }
+- return s;
+- }
+-
+- static Hashtable objectMethods = new Hashtable();
+-
+- public static Method[] findMethods(Class c) {
+- Method methods[] = (Method[]) objectMethods.get(c);
+- if (methods != null)
+- return methods;
+-
+- methods = c.getMethods();
+- objectMethods.put(c, methods);
+- return methods;
+- }
+-
+- public static Method findMethod(Class c, String name, Class params[]) {
+- Method methods[] = findMethods(c);
+- if (methods == null)
+- return null;
+- for (int i = 0; i < methods.length; i++) {
+- if (methods[i].getName().equals(name)) {
+- Class methodParams[] = methods[i].getParameterTypes();
+- if (methodParams == null)
+- if (params == null || params.length == 0)
+- return methods[i];
+- if (params == null)
+- if (methodParams == null || methodParams.length == 0)
+- return methods[i];
+- if (params.length != methodParams.length)
+- continue;
+- boolean found = true;
+- for (int j = 0; j < params.length; j++) {
+- if (params[j] != methodParams[j]) {
+- found = false;
+- break;
+- }
+- }
+- if (found)
+- return methods[i];
+- }
+- }
+- return null;
+- }
+-
+- /** Test if the object implements a particular
+- * method
+- */
+- public static boolean hasHook(Object obj, String methodN) {
+- try {
+- Method myMethods[] = findMethods(obj.getClass());
+- for (int i = 0; i < myMethods.length; i++) {
+- if (methodN.equals(myMethods[i].getName())) {
+- // check if it's overriden
+- Class declaring = myMethods[i].getDeclaringClass();
+- Class parentOfDeclaring = declaring.getSuperclass();
+- // this works only if the base class doesn't extend
+- // another class.
+-
+- // if the method is declared in a top level class
+- // like BaseInterceptor parent is Object, otherwise
+- // parent is BaseInterceptor or an intermediate class
+- if
(!"java.lang.Object".equals(parentOfDeclaring.getName())) {
+- return true;
+- }
+- }
+- }
+- } catch (Exception ex) {
+- ex.printStackTrace();
+- }
+- return false;
+- }
+-
+- public static void callMain(Class c, String args[]) throws Exception {
+- Class p[] = new Class[1];
+- p[0] = args.getClass();
+- Method m = c.getMethod("main", p);
+- m.invoke(c, new Object[] { args });
+- }
+-
+- public static Object callMethod1(Object target, String methodN,
+- Object param1, String typeParam1, ClassLoader cl) throws Exception {
+- if (target == null || param1 == null) {
+- d("Assert: Illegal params " + target + " " + param1);
+- }
+- if (dbg > 0)
+- d("callMethod1 " + target.getClass().getName() + " "
+- + param1.getClass().getName() + " " + typeParam1);
+-
+- Class params[] = new Class[1];
+- if (typeParam1 == null)
+- params[0] = param1.getClass();
+- else
+- params[0] = cl.loadClass(typeParam1);
+- Method m = findMethod(target.getClass(), methodN, params);
+- if (m == null)
+- throw new NoSuchMethodException(target.getClass().getName() + " "
+- + methodN);
+- return m.invoke(target, new Object[] { param1 });
+- }
+-
+- public static Object callMethod0(Object target, String methodN)
+- throws Exception {
+- if (target == null) {
+- d("Assert: Illegal params " + target);
+- return null;
+- }
+- if (dbg > 0)
+- d("callMethod0 " + target.getClass().getName() + "." +
methodN);
+-
+- Class params[] = new Class[0];
+- Method m = findMethod(target.getClass(), methodN, params);
+- if (m == null)
+- throw new NoSuchMethodException(target.getClass().getName() + " "
+- + methodN);
+- return m.invoke(target, emptyArray);
+- }
+-
+- static Object[] emptyArray = new Object[] {};
+-
+- public static Object callMethodN(Object target, String methodN,
+- Object params[], Class typeParams[]) throws Exception {
+- Method m = null;
+- m = findMethod(target.getClass(), methodN, typeParams);
+- if (m == null) {
+- d("Can't find method " + methodN + " in " + target +
" CLASS "
+- + target.getClass());
+- return null;
+- }
+- Object o = m.invoke(target, params);
+-
+- if (dbg > 0) {
+- // debug
+- StringBuffer sb = new StringBuffer();
+- sb.append("" + target.getClass().getName() + "." +
methodN + "( ");
+- for (int i = 0; i < params.length; i++) {
+- if (i > 0)
+- sb.append(", ");
+- sb.append(params[i]);
+- }
+- sb.append(")");
+- d(sb.toString());
+- }
+- return o;
+- }
+-
+- public static Object convert(String object, Class paramType) {
+- Object result = null;
+- if ("java.lang.String".equals(paramType.getName())) {
+- result = object;
+- } else if ("java.lang.Integer".equals(paramType.getName())
+- || "int".equals(paramType.getName())) {
+- try {
+- result = new Integer(object);
+- } catch (NumberFormatException ex) {
+- }
+- // Try a setFoo ( boolean )
+- } else if ("java.lang.Boolean".equals(paramType.getName())
+- || "boolean".equals(paramType.getName())) {
+- result = new Boolean(object);
+-
+- // Try a setFoo ( InetAddress )
+- } else if ("java.net.InetAddress".equals(paramType
+- .getName())) {
+- try {
+- result = InetAddress.getByName(object);
+- } catch (UnknownHostException exc) {
+- d("Unable to resolve host name:" + object);
+- }
+-
+- // Unknown type
+- } else {
+- d("Unknown type " + paramType.getName());
+- }
+- if (result == null) {
+- throw new IllegalArgumentException("Can't convert argument: "
+ object);
+- }
+- return result;
+- }
+-
+- // -------------------- Get property --------------------
+- // This provides a layer of abstraction
+-
+- public static interface PropertySource {
+-
+- public String getProperty(String key);
+-
+- }
+-
+- public static interface AttributeHolder {
+-
+- public void setAttribute(String key, Object o);
+-
+- }
+-
+- // debug --------------------
+- static final int dbg = 0;
+-
+- static void d(String s) {
+- if (log.isDebugEnabled())
+- log.debug("IntrospectionUtils: " + s);
+- }
+-}
+Index: test/org/apache/catalina/tribes/demos/MapDemo.java
+===================================================================
+--- test/org/apache/catalina/tribes/demos/MapDemo.java (revision 590752)
++++ test/org/apache/catalina/tribes/demos/MapDemo.java (working copy)
+@@ -1,497 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.demos;
+-
+-import java.io.Serializable;
+-import java.util.Map;
+-
+-import java.awt.ComponentOrientation;
+-import java.awt.Dimension;
+-import java.awt.event.ActionEvent;
+-import java.awt.event.ActionListener;
+-import java.awt.event.MouseAdapter;
+-import java.awt.event.MouseEvent;
+-import javax.swing.BoxLayout;
+-import javax.swing.JButton;
+-import javax.swing.JFrame;
+-import javax.swing.JPanel;
+-import javax.swing.JScrollPane;
+-import javax.swing.JTable;
+-import javax.swing.JTextField;
+-import javax.swing.table.AbstractTableModel;
+-import javax.swing.table.TableModel;
+-
+-import org.apache.catalina.tribes.Channel;
+-import org.apache.catalina.tribes.ChannelListener;
+-import org.apache.catalina.tribes.ManagedChannel;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.MembershipListener;
+-import org.apache.catalina.tribes.tipis.AbstractReplicatedMap;
+-import org.apache.catalina.tribes.tipis.LazyReplicatedMap;
+-import javax.swing.table.DefaultTableCellRenderer;
+-import java.awt.Color;
+-import java.awt.Component;
+-import javax.swing.table.TableColumn;
+-import org.apache.catalina.tribes.util.UUIDGenerator;
+-import org.apache.catalina.tribes.util.Arrays;
+-import java.util.Set;
+-import java.util.Random;
+-
+-/**
+- * <p>Title: </p>
+- *
+- * <p>Description: </p>
+- *
+- * <p>Company: </p>
+- *
+- * @author not attributable
+- * @version 1.0
+- */
+-public class MapDemo implements ChannelListener, MembershipListener{
+-
+- protected LazyReplicatedMap map;
+- protected SimpleTableDemo table;
+-
+- public MapDemo(Channel channel, String mapName ) {
+- map = new LazyReplicatedMap(null,channel,5000, mapName,null);
+- table =
SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName());
+- channel.addChannelListener(this);
+- channel.addMembershipListener(this);
+-// for ( int i=0; i<1000; i++ ) {
+-// map.put("MyKey-"+i,"My String Value-"+i);
+-// }
+- this.messageReceived(null,null);
+- }
+-
+- public boolean accept(Serializable msg, Member source) {
+- table.dataModel.getValueAt(-1,-1);
+- return false;
+- }
+-
+- public void messageReceived(Serializable msg, Member source) {
+-
+- }
+-
+- public void memberAdded(Member member) {
+- }
+- public void memberDisappeared(Member member) {
+- table.dataModel.getValueAt(-1,-1);
+- }
+-
+- public static void usage() {
+- System.out.println("Tribes MapDemo.");
+- System.out.println("Usage:\n\t" +
+- "java MapDemo [channel options] mapName\n\t" +
+- "\tChannel options:" +
+- ChannelCreator.usage());
+- }
+-
+- public static void main(String[] args) throws Exception {
+- long start = System.currentTimeMillis();
+- ManagedChannel channel = (ManagedChannel) ChannelCreator.createChannel(args);
+- String mapName = "MapDemo";
+- if ( args.length > 0 &&
(!args[args.length-1].startsWith("-"))) {
+- mapName = args[args.length-1];
+- }
+- channel.start(channel.DEFAULT);
+- Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
+- MapDemo demo = new MapDemo(channel,mapName);
+-
+- System.out.println("System test complete, time to
start="+(System.currentTimeMillis()-start)+" ms. Sleeping to let threads
finish.");
+- Thread.sleep(60 * 1000 * 60);
+- }
+-
+- public static class Shutdown
+- extends Thread {
+- ManagedChannel channel = null;
+- public Shutdown(ManagedChannel channel) {
+- this.channel = channel;
+- }
+-
+- public void run() {
+- System.out.println("Shutting down...");
+- SystemExit exit = new SystemExit(5000);
+- exit.setDaemon(true);
+- exit.start();
+- try {
+- channel.stop(channel.DEFAULT);
+-
+- } catch (Exception x) {
+- x.printStackTrace();
+- }
+- System.out.println("Channel stopped.");
+- }
+- }
+-
+- public static class SystemExit
+- extends Thread {
+- private long delay;
+- public SystemExit(long delay) {
+- this.delay = delay;
+- }
+-
+- public void run() {
+- try {
+- Thread.sleep(delay);
+- } catch (Exception x) {
+- x.printStackTrace();
+- }
+- System.exit(0);
+-
+- }
+- }
+-
+- public static class SimpleTableDemo
+- extends JPanel implements ActionListener{
+- private static int WIDTH = 550;
+-
+- private LazyReplicatedMap map;
+- private boolean DEBUG = false;
+- AbstractTableModel dataModel = new AbstractTableModel() {
+-
+-
+- String[] columnNames = {
+- "Rownum",
+- "Key",
+- "Value",
+- "Primary Node",
+- "Backup Node",
+- "isPrimary",
+- "isProxy",
+- "isBackup"};
+-
+- public int getColumnCount() { return columnNames.length; }
+-
+- public int getRowCount() {return map.sizeFull() +1; }
+-
+- public StringBuffer getMemberNames(Member[] members){
+- StringBuffer buf = new StringBuffer();
+- if ( members!=null ) {
+- for (int i=0;i<members.length; i++ ) {
+- buf.append(members[i].getName());
+- buf.append("; ");
+- }
+- }
+- return buf;
+- }
+-
+- public Object getValueAt(int row, int col) {
+- if ( row==-1 ) {
+- update();
+- return "";
+- }
+- if ( row == 0 ) return columnNames[col];
+- Object[] keys = map.keySetFull().toArray();
+- String key = (String)keys [row-1];
+- LazyReplicatedMap.MapEntry entry = map.getInternal(key);
+- switch (col) {
+- case 0: return String.valueOf(row);
+- case 1: return entry.getKey();
+- case 2: return entry.getValue();
+- case 3: return
entry.getPrimary()!=null?entry.getPrimary().getName():"null";
+- case 4: return getMemberNames(entry.getBackupNodes());
+- case 5: return new Boolean(entry.isPrimary());
+- case 6: return new Boolean(entry.isProxy());
+- case 7: return new Boolean(entry.isBackup());
+- default: return "";
+- }
+-
+- }
+-
+- public void update() {
+- fireTableDataChanged();
+- }
+- };
+-
+- JTextField txtAddKey = new JTextField(20);
+- JTextField txtAddValue = new JTextField(20);
+- JTextField txtRemoveKey = new JTextField(20);
+- JTextField txtChangeKey = new JTextField(20);
+- JTextField txtChangeValue = new JTextField(20);
+-
+- JTable table = null;
+- public SimpleTableDemo(LazyReplicatedMap map) {
+- super();
+- this.map = map;
+-
+- this.setComponentOrientation(ComponentOrientation.LEFT_TO_RIGHT);
+-
+- //final JTable table = new JTable(data, columnNames);
+- table = new JTable(dataModel);
+-
+- table.setPreferredScrollableViewportSize(new Dimension(WIDTH, 150));
+- for ( int i=0; i<table.getColumnCount(); i++ ) {
+- TableColumn tm = table.getColumnModel().getColumn(i);
+- tm.setCellRenderer(new ColorRenderer());
+- }
+-
+-
+- if (DEBUG) {
+- table.addMouseListener(new MouseAdapter() {
+- public void mouseClicked(MouseEvent e) {
+- printDebugData(table);
+- }
+- });
+- }
+-
+- //setLayout(new GridLayout(5, 0));
+- setLayout(new BoxLayout(this, BoxLayout.Y_AXIS));
+-
+- //Create the scroll pane and add the table to it.
+- JScrollPane scrollPane = new JScrollPane(table);
+-
+- //Add the scroll pane to this panel.
+- add(scrollPane);
+-
+- //create a add value button
+- JPanel addpanel = new JPanel();
+- addpanel.setPreferredSize(new Dimension(WIDTH,30));
+- addpanel.add(createButton("Add","add"));
+- addpanel.add(txtAddKey);
+- addpanel.add(txtAddValue);
+- addpanel.setMaximumSize(new Dimension(WIDTH,30));
+- add(addpanel);
+-
+- //create a remove value button
+- JPanel removepanel = new JPanel( );
+- removepanel.setPreferredSize(new Dimension(WIDTH,30));
+- removepanel.add(createButton("Remove","remove"));
+- removepanel.add(txtRemoveKey);
+- removepanel.setMaximumSize(new Dimension(WIDTH,30));
+- add(removepanel);
+-
+- //create a change value button
+- JPanel changepanel = new JPanel( );
+- changepanel.add(createButton("Change","change"));
+- changepanel.add(txtChangeKey);
+- changepanel.add(txtChangeValue);
+- changepanel.setPreferredSize(new Dimension(WIDTH,30));
+- changepanel.setMaximumSize(new Dimension(WIDTH,30));
+- add(changepanel);
+-
+-
+- //create sync button
+- JPanel syncpanel = new JPanel( );
+- syncpanel.add(createButton("Synchronize","sync"));
+- syncpanel.add(createButton("Replicate","replicate"));
+- syncpanel.add(createButton("Random","random"));
+- syncpanel.setPreferredSize(new Dimension(WIDTH,30));
+- syncpanel.setMaximumSize(new Dimension(WIDTH,30));
+- add(syncpanel);
+-
+-
+- }
+-
+- public JButton createButton(String text, String command) {
+- JButton button = new JButton(text);
+- button.setActionCommand(command);
+- button.addActionListener(this);
+- return button;
+- }
+-
+- public void actionPerformed(ActionEvent e) {
+- System.out.println(e.getActionCommand());
+- if ( "add".equals(e.getActionCommand()) ) {
+- System.out.println("Add key:"+txtAddKey.getText()+"
value:"+txtAddValue.getText());
+- map.put(txtAddKey.getText(),new StringBuffer(txtAddValue.getText()));
+- }
+- if ( "change".equals(e.getActionCommand()) ) {
+- System.out.println("Change key:"+txtChangeKey.getText()+"
value:"+txtChangeValue.getText());
+- StringBuffer buf = (StringBuffer)map.get(txtChangeKey.getText());
+- if ( buf!=null ) {
+- buf.delete(0,buf.length());
+- buf.append(txtChangeValue.getText());
+- map.replicate(txtChangeKey.getText(),true);
+- } else {
+- buf = new StringBuffer();
+- buf.append(txtChangeValue.getText());
+- map.put(txtChangeKey.getText(),buf);
+- }
+- }
+- if ( "remove".equals(e.getActionCommand()) ) {
+- System.out.println("Remove key:"+txtRemoveKey.getText());
+- map.remove(txtRemoveKey.getText());
+- }
+- if ( "sync".equals(e.getActionCommand()) ) {
+- System.out.println("Syncing from another node.");
+- map.transferState();
+- }
+- if ( "random".equals(e.getActionCommand()) ) {
+- Thread t = new Thread() {
+- public void run() {
+- for (int i = 0; i < 5; i++) {
+- String key = random(5,0,0,true,true,null);
+- map.put(key, new StringBuffer(key));
+- dataModel.fireTableDataChanged();
+- table.paint(table.getGraphics());
+- try {
+- Thread.sleep(500);
+- } catch (InterruptedException x) {
+- Thread.currentThread().interrupted();
+- }
+- }
+- }
+- };
+- t.start();
+- }
+-
+- if ( "replicate".equals(e.getActionCommand()) ) {
+- System.out.println("Replicating out to the other nodes.");
+- map.replicate(true);
+- }
+- dataModel.getValueAt(-1,-1);
+- }
+-
+- public static Random random = new Random(System.currentTimeMillis());
+- public static String random(int count, int start, int end, boolean letters,
boolean numbers,
+- char[] chars ) {
+- if (count == 0) {
+- return "";
+- } else if (count < 0) {
+- throw new IllegalArgumentException("Requested random string length
" + count + " is less than 0.");
+- }
+- if ((start == 0) && (end == 0)) {
+- end = 'z' + 1;
+- start = ' ';
+- if (!letters && !numbers) {
+- start = 0;
+- end = Integer.MAX_VALUE;
+- }
+- }
+-
+- char[] buffer = new char[count];
+- int gap = end - start;
+-
+- while (count-- != 0) {
+- char ch;
+- if (chars == null) {
+- ch = (char) (random.nextInt(gap) + start);
+- } else {
+- ch = chars[random.nextInt(gap) + start];
+- }
+- if ((letters && Character.isLetter(ch))
+- || (numbers && Character.isDigit(ch))
+- || (!letters && !numbers))
+- {
+- if(ch >= 56320 && ch <= 57343) {
+- if(count == 0) {
+- count++;
+- } else {
+- // low surrogate, insert high surrogate after putting it in
+- buffer[count] = ch;
+- count--;
+- buffer[count] = (char) (55296 + random.nextInt(128));
+- }
+- } else if(ch >= 55296 && ch <= 56191) {
+- if(count == 0) {
+- count++;
+- } else {
+- // high surrogate, insert low surrogate before putting it
in
+- buffer[count] = (char) (56320 + random.nextInt(128));
+- count--;
+- buffer[count] = ch;
+- }
+- } else if(ch >= 56192 && ch <= 56319) {
+- // private high surrogate, no effing clue, so skip it
+- count++;
+- } else {
+- buffer[count] = ch;
+- }
+- } else {
+- count++;
+- }
+- }
+- return new String(buffer);
+- }
+-
+- private void printDebugData(JTable table) {
+- int numRows = table.getRowCount();
+- int numCols = table.getColumnCount();
+- javax.swing.table.TableModel model = table.getModel();
+-
+- System.out.println("Value of data: ");
+- for (int i = 0; i < numRows; i++) {
+- System.out.print(" row " + i + ":");
+- for (int j = 0; j < numCols; j++) {
+- System.out.print(" " + model.getValueAt(i, j));
+- }
+- System.out.println();
+- }
+- System.out.println("--------------------------");
+- }
+-
+- /**
+- * Create the GUI and show it. For thread safety,
+- * this method should be invoked from the
+- * event-dispatching thread.
+- */
+- public static SimpleTableDemo createAndShowGUI(LazyReplicatedMap map, String
title) {
+- //Make sure we have nice window decorations.
+- JFrame.setDefaultLookAndFeelDecorated(true);
+-
+- //Create and set up the window.
+- JFrame frame = new JFrame("SimpleTableDemo - "+title);
+- frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
+-
+- //Create and set up the content pane.
+- SimpleTableDemo newContentPane = new SimpleTableDemo(map);
+- newContentPane.setOpaque(true); //content panes must be opaque
+- frame.setContentPane(newContentPane);
+-
+- //Display the window.
+- frame.setSize(450,250);
+- newContentPane.setSize(450,300);
+- frame.pack();
+- frame.setVisible(true);
+- return newContentPane;
+- }
+- }
+-
+- static class ColorRenderer extends DefaultTableCellRenderer {
+-
+- public ColorRenderer() {
+- super();
+- }
+-
+- public Component getTableCellRendererComponent
+- (JTable table, Object value, boolean isSelected,
+- boolean hasFocus, int row, int column) {
+- Component cell = super.getTableCellRendererComponent
+- (table, value, isSelected, hasFocus, row, column);
+- cell.setBackground(Color.WHITE);
+- if ( row > 0 ) {
+- Color color = null;
+- boolean primary = ( (Boolean) table.getValueAt(row, 5)).booleanValue();
+- boolean proxy = ( (Boolean) table.getValueAt(row, 6)).booleanValue();
+- boolean backup = ( (Boolean) table.getValueAt(row, 7)).booleanValue();
+- if (primary) color = Color.GREEN;
+- else if (proxy) color = Color.RED;
+- else if (backup) color = Color.BLUE;
+- if ( color != null ) cell.setBackground(color);
+- }
+-// System.out.println("Row:"+row+" Column:"+column+"
Color:"+cell.getBackground());
+-// cell.setBackground(bkgndColor);
+-// cell.setForeground(fgndColor);
+-
+- return cell;
+- }
+-
+-
+- }
+-
+-
+-}
+Index: test/org/apache/catalina/tribes/demos/EchoRpcTest.java
+===================================================================
+--- test/org/apache/catalina/tribes/demos/EchoRpcTest.java (revision 590752)
++++ test/org/apache/catalina/tribes/demos/EchoRpcTest.java (working copy)
+@@ -1,218 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.demos;
+-
+-import java.io.Serializable;
+-
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.group.RpcCallback;
+-import org.apache.catalina.tribes.Channel;
+-import org.apache.catalina.tribes.ManagedChannel;
+-import org.apache.catalina.tribes.group.RpcChannel;
+-import org.apache.catalina.tribes.group.Response;
+-
+-
+-/**
+- * <p>Title: </p>
+- *
+- * <p>Description: </p>
+- *
+- * <p>Company: </p>
+- *
+- * @author not attributable
+- * @version 1.0
+- */
+-public class EchoRpcTest implements RpcCallback, Runnable {
+-
+- Channel channel;
+- int count;
+- String message;
+- long pause;
+- RpcChannel rpc;
+- int options;
+- long timeout;
+- String name;
+-
+- public EchoRpcTest(Channel channel, String name, int count, String message, long
pause, int options, long timeout) {
+- this.channel = channel;
+- this.count = count;
+- this.message = message;
+- this.pause = pause;
+- this.options = options;
+- this.rpc = new RpcChannel(name.getBytes(),channel,this);
+- this.timeout = timeout;
+- this.name = name;
+- }
+-
+- /**
+- * If the reply has already been sent to the requesting thread, the rpc
+- * callback can handle any data that comes in after the fact.
+- *
+- * @param msg Serializable
+- * @param sender Member
+- * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback
+- * method
+- */
+- public void leftOver(Serializable msg, Member sender) {
+- System.out.println("Received a left over message from
["+sender.getName()+"] with data ["+msg+"]");
+- }
+-
+- /**
+- *
+- * @param msg Serializable
+- * @param sender Member
+- * @return Serializable - null if no reply should be sent
+- * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback
+- * method
+- */
+- public Serializable replyRequest(Serializable msg, Member sender) {
+- System.out.println("Received a reply request message from
["+sender.getName()+"] with data ["+msg+"]");
+- return "Reply("+name+"):"+msg;
+- }
+-
+- public void run() {
+- long counter = 0;
+- while (counter<count) {
+- String msg = message + " cnt="+(++counter);
+- try {
+- System.out.println("Sending ["+msg+"]");
+- long start = System.currentTimeMillis();
+- Response[] resp =
rpc.send(channel.getMembers(),(Serializable)msg,options,Channel.SEND_OPTIONS_DEFAULT,timeout);
+- System.out.println("Send of ["+msg+"] completed. Nr of
responses="+resp.length+" Time:"+(System.currentTimeMillis()-start)+"
ms.");
+- for ( int i=0; i<resp.length; i++ ) {
+- System.out.println("Received a response message from
["+resp[i].getSource().getName()+"] with data
["+resp[i].getMessage()+"]");
+- }
+- Thread.sleep(pause);
+- }catch(Exception x){}
+- }
+- }
+-
+- public static void usage() {
+- System.out.println("Tribes RPC tester.");
+- System.out.println("Usage:\n\t"+
+- "java EchoRpcTest [options]\n\t"+
+- "Options:\n\t\t"+
+- "[-mode all|first|majority] \n\t\t"+
+- "[-debug] \n\t\t"+
+- "[-count messagecount] \n\t\t"+
+- "[-timeout timeoutinms] \n\t\t"+
+- "[-stats statinterval] \n\t\t"+
+- "[-pause nrofsecondstopausebetweensends]
\n\t\t"+
+- "[-message message] \n\t\t"+
+- "[-name rpcname] \n\t\t"+
+- "[-break (halts execution on exception)]\n"+
+- "\tChannel options:"+
+- ChannelCreator.usage()+"\n\n"+
+- "Example:\n\t"+
+- "java EchoRpcTest -port 4004\n\t"+
+- "java EchoRpcTest -bind 192.168.0.45 -port
4005\n\t"+
+- "java EchoRpcTest -bind 192.168.0.45 -port 4005
-mbind 192.168.0.45 -count 100 -stats 10\n");
+- }
+-
+- public static void main(String[] args) throws Exception {
+- boolean send = true;
+- boolean debug = false;
+- long pause = 3000;
+- int count = 1000000;
+- int stats = 10000;
+- String name = "EchoRpcId";
+- boolean breakOnEx = false;
+- int threads = 1;
+- int options = RpcChannel.ALL_REPLY;
+- long timeout = 15000;
+- String message = "EchoRpcMessage";
+- if ( args.length == 0 ) {
+- args = new String[] {"-help"};
+- }
+- for (int i = 0; i < args.length; i++) {
+- if ("-threads".equals(args[i])) {
+- threads = Integer.parseInt(args[++i]);
+- } else if ("-count".equals(args[i])) {
+- count = Integer.parseInt(args[++i]);
+- System.out.println("Sending "+count+"
messages.");
+- } else if ("-pause".equals(args[i])) {
+- pause = Long.parseLong(args[++i])*1000;
+- } else if ("-break".equals(args[i])) {
+- breakOnEx = true;
+- } else if ("-stats".equals(args[i])) {
+- stats = Integer.parseInt(args[++i]);
+- System.out.println("Stats every "+stats+"
message");
+- } else if ("-timeout".equals(args[i])) {
+- timeout = Long.parseLong(args[++i]);
+- } else if ("-message".equals(args[i])) {
+- message = args[++i];
+- } else if ("-name".equals(args[i])) {
+- name = args[++i];
+- } else if ("-mode".equals(args[i])) {
+- if ( "all".equals(args[++i]) ) options =
RpcChannel.ALL_REPLY;
+- else if ( "first".equals(args[i]) ) options =
RpcChannel.FIRST_REPLY;
+- else if ( "majority".equals(args[i]) ) options =
RpcChannel.MAJORITY_REPLY;
+- } else if ("-debug".equals(args[i])) {
+- debug = true;
+- } else if ("-help".equals(args[i]))
+- {
+- usage();
+- System.exit(1);
+- }
+- }
+-
+-
+- ManagedChannel channel =
(ManagedChannel)ChannelCreator.createChannel(args);
+- EchoRpcTest test = new
EchoRpcTest(channel,name,count,message,pause,options,timeout);
+- channel.start(channel.DEFAULT);
+- Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
+- test.run();
+-
+- System.out.println("System test complete, sleeping to let threads
finish.");
+- Thread.sleep(60*1000*60);
+- }
+-
+- public static class Shutdown extends Thread {
+- ManagedChannel channel = null;
+- public Shutdown(ManagedChannel channel) {
+- this.channel = channel;
+- }
+-
+- public void run() {
+- System.out.println("Shutting down...");
+- SystemExit exit = new SystemExit(5000);
+- exit.setDaemon(true);
+- exit.start();
+- try {
+- channel.stop(channel.DEFAULT);
+-
+- }catch ( Exception x ) {
+- x.printStackTrace();
+- }
+- System.out.println("Channel stopped.");
+- }
+- }
+- public static class SystemExit extends Thread {
+- private long delay;
+- public SystemExit(long delay) {
+- this.delay = delay;
+- }
+- public void run () {
+- try {
+- Thread.sleep(delay);
+- }catch ( Exception x ) {
+- x.printStackTrace();
+- }
+- System.exit(0);
+-
+- }
+- }}
+\ No newline at end of file
+Index: test/org/apache/catalina/tribes/demos/ChannelCreator.java
+===================================================================
+--- test/org/apache/catalina/tribes/demos/ChannelCreator.java (revision 590752)
++++ test/org/apache/catalina/tribes/demos/ChannelCreator.java (working copy)
+@@ -1,254 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.demos;
+-
+-import java.util.Iterator;
+-import java.util.Properties;
+-
+-import org.apache.catalina.tribes.Channel;
+-import org.apache.catalina.tribes.ManagedChannel;
+-import org.apache.catalina.tribes.group.GroupChannel;
+-import org.apache.catalina.tribes.group.interceptors.FragmentationInterceptor;
+-import org.apache.catalina.tribes.group.interceptors.GzipInterceptor;
+-import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
+-import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
+-import org.apache.catalina.tribes.membership.McastService;
+-import org.apache.catalina.tribes.transport.MultiPointSender;
+-import org.apache.catalina.tribes.transport.ReceiverBase;
+-import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+-import org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor;
+-import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+-import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+-import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
+-import java.util.ArrayList;
+-import org.apache.catalina.tribes.membership.MemberImpl;
+-import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
+-import org.apache.catalina.tribes.Member;
+-
+-/**
+- * <p>Title: </p>
+- *
+- * <p>Description: </p>
+- *
+- *
+- * <p>Company: </p>
+- *
+- * @author fhanik
+- * @version 1.0
+- */
+-public class ChannelCreator {
+-
+-
+- public static StringBuffer usage() {
+- StringBuffer buf = new StringBuffer();
+- buf.append("\n\t\t[-bind tcpbindaddress]")
+- .append("\n\t\t[-tcpselto tcpselectortimeout]")
+- .append("\n\t\t[-tcpthreads tcpthreadcount]")
+- .append("\n\t\t[-port tcplistenport]")
+- .append("\n\t\t[-autobind tcpbindtryrange]")
+- .append("\n\t\t[-ackto acktimeout]")
+- .append("\n\t\t[-receiver
org.apache.catalina.tribes.transport.nio.NioReceiver|org.apache.catalina.tribes.transport.bio.BioReceiver|]")
+- .append("\n\t\t[-transport
org.apache.catalina.tribes.transport.nio.PooledParallelSender|org.apache.catalina.tribes.transport.bio.PooledMultiSender]")
+- .append("\n\t\t[-transport.xxx transport specific property]")
+- .append("\n\t\t[-maddr multicastaddr]")
+- .append("\n\t\t[-mport multicastport]")
+- .append("\n\t\t[-mbind multicastbindaddr]")
+- .append("\n\t\t[-mfreq multicastfrequency]")
+- .append("\n\t\t[-mdrop multicastdroptime]")
+- .append("\n\t\t[-gzip]")
+- .append("\n\t\t[-static hostname:port (-static localhost:9999 -static
127.0.0.1:8888 can be repeated)]")
+- .append("\n\t\t[-order]")
+- .append("\n\t\t[-ordersize maxorderqueuesize]")
+- .append("\n\t\t[-frag]")
+- .append("\n\t\t[-fragsize maxmsgsize]")
+- .append("\n\t\t[-throughput]")
+- .append("\n\t\t[-failuredetect]")
+- .append("\n\t\t[-async]")
+- .append("\n\t\t[-asyncsize maxqueuesizeinkilobytes]");
+- return buf;
+-
+- }
+-
+- public static Channel createChannel(String[] args) throws Exception {
+- String bind = "auto";
+- int port = 4001;
+- String mbind = null;
+- boolean gzip = false;
+- int tcpseltimeout = 5000;
+- int tcpthreadcount = 4;
+- int acktimeout = 15000;
+- String mcastaddr = "228.0.0.5";
+- int mcastport = 45565;
+- long mcastfreq = 500;
+- long mcastdrop = 2000;
+- boolean order = false;
+- int ordersize = Integer.MAX_VALUE;
+- boolean frag = false;
+- int fragsize = 1024;
+- int autoBind = 10;
+- ArrayList staticMembers = new ArrayList();
+- Properties transportProperties = new Properties();
+- String transport =
"org.apache.catalina.tribes.transport.nio.PooledParallelSender";
+- String receiver =
"org.apache.catalina.tribes.transport.nio.NioReceiver";
+- boolean async = false;
+- int asyncsize = 1024*1024*50; //50MB
+- boolean throughput = false;
+- boolean failuredetect = false;
+-
+- for (int i = 0; i < args.length; i++) {
+- if ("-bind".equals(args[i])) {
+- bind = args[++i];
+- } else if ("-port".equals(args[i])) {
+- port = Integer.parseInt(args[++i]);
+- } else if ("-autobind".equals(args[i])) {
+- autoBind = Integer.parseInt(args[++i]);
+- } else if ("-tcpselto".equals(args[i])) {
+- tcpseltimeout = Integer.parseInt(args[++i]);
+- } else if ("-tcpthreads".equals(args[i])) {
+- tcpthreadcount = Integer.parseInt(args[++i]);
+- } else if ("-gzip".equals(args[i])) {
+- gzip = true;
+- } else if ("-async".equals(args[i])) {
+- async = true;
+- } else if ("-failuredetect".equals(args[i])) {
+- failuredetect = true;
+- } else if ("-asyncsize".equals(args[i])) {
+- asyncsize = Integer.parseInt(args[++i]);
+- System.out.println("Setting
MessageDispatchInterceptor.maxQueueSize="+asyncsize);
+- } else if ("-static".equals(args[i])) {
+- String d = args[++i];
+- String h = d.substring(0,d.indexOf(":"));
+- String p = d.substring(h.length()+1);
+- MemberImpl m = new MemberImpl(h,Integer.parseInt(p),2000);
+- staticMembers.add(m);
+- } else if ("-throughput".equals(args[i])) {
+- throughput = true;
+- } else if ("-order".equals(args[i])) {
+- order = true;
+- } else if ("-ordersize".equals(args[i])) {
+- ordersize = Integer.parseInt(args[++i]);
+- System.out.println("Setting
OrderInterceptor.maxQueue="+ordersize);
+- } else if ("-frag".equals(args[i])) {
+- frag = true;
+- } else if ("-fragsize".equals(args[i])) {
+- fragsize = Integer.parseInt(args[++i]);
+- System.out.println("Setting
FragmentationInterceptor.maxSize="+fragsize);
+- } else if ("-ackto".equals(args[i])) {
+- acktimeout = Integer.parseInt(args[++i]);
+- } else if ("-transport".equals(args[i])) {
+- transport = args[++i];
+- } else if (args[i]!=null &&
args[i].startsWith("transport.")) {
+- String key = args[i];
+- String val = args[++i];
+- transportProperties.setProperty(key,val);
+- } else if ("-receiver".equals(args[i])) {
+- receiver = args[++i];
+- } else if ("-maddr".equals(args[i])) {
+- mcastaddr = args[++i];
+- } else if ("-mport".equals(args[i])) {
+- mcastport = Integer.parseInt(args[++i]);
+- } else if ("-mfreq".equals(args[i])) {
+- mcastfreq = Long.parseLong(args[++i]);
+- } else if ("-mdrop".equals(args[i])) {
+- mcastdrop = Long.parseLong(args[++i]);
+- } else if ("-mbind".equals(args[i])) {
+- mbind = args[++i];
+- }
+- }
+-
+- System.out.println("Creating receiver class="+receiver);
+- Class cl = Class.forName(receiver,true,ChannelCreator.class.getClassLoader());
+- ReceiverBase rx = (ReceiverBase)cl.newInstance();
+- rx.setTcpListenAddress(bind);
+- rx.setTcpListenPort(port);
+- rx.setTcpSelectorTimeout(tcpseltimeout);
+- rx.setTcpThreadCount(tcpthreadcount);
+- rx.getBind();
+- rx.setRxBufSize(43800);
+- rx.setTxBufSize(25188);
+- rx.setAutoBind(autoBind);
+-
+-
+- ReplicationTransmitter ps = new ReplicationTransmitter();
+- System.out.println("Creating transport class="+transport);
+- MultiPointSender sender =
(MultiPointSender)Class.forName(transport,true,ChannelCreator.class.getClassLoader()).newInstance();
+- sender.setTimeout(acktimeout);
+- sender.setMaxRetryAttempts(2);
+- sender.setRxBufSize(43800);
+- sender.setTxBufSize(25188);
+-
+- Iterator i = transportProperties.keySet().iterator();
+- while ( i.hasNext() ) {
+- String key = (String)i.next();
+-
IntrospectionUtils.setProperty(sender,key,transportProperties.getProperty(key));
+- }
+- ps.setTransport(sender);
+-
+- McastService service = new McastService();
+- service.setMcastAddr(mcastaddr);
+- if (mbind != null) service.setMcastBindAddress(mbind);
+- service.setMcastFrequency(mcastfreq);
+- service.setMcastDropTime(mcastdrop);
+- service.setMcastPort(mcastport);
+-
+- ManagedChannel channel = new GroupChannel();
+- channel.setChannelReceiver(rx);
+- channel.setChannelSender(ps);
+- channel.setMembershipService(service);
+-
+- if ( throughput ) channel.addInterceptor(new ThroughputInterceptor());
+- if (gzip) channel.addInterceptor(new GzipInterceptor());
+- if ( frag ) {
+- FragmentationInterceptor fi = new FragmentationInterceptor();
+- fi.setMaxSize(fragsize);
+- channel.addInterceptor(fi);
+- }
+- if (order) {
+- OrderInterceptor oi = new OrderInterceptor();
+- oi.setMaxQueue(ordersize);
+- channel.addInterceptor(oi);
+- }
+-
+- if ( async ) {
+- MessageDispatchInterceptor mi = new MessageDispatch15Interceptor();
+- mi.setMaxQueueSize(asyncsize);
+- channel.addInterceptor(mi);
+- System.out.println("Added MessageDispatchInterceptor");
+- }
+-
+- if ( failuredetect ) {
+- TcpFailureDetector tcpfi = new TcpFailureDetector();
+- channel.addInterceptor(tcpfi);
+- }
+- if ( staticMembers.size() > 0 ) {
+- StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
+- for (int x=0; x<staticMembers.size(); x++ ) {
+- smi.addStaticMember((Member)staticMembers.get(x));
+- }
+- channel.addInterceptor(smi);
+- }
+-
+-
+- byte[] domain = new byte[] {1,2,3,4,5,6,7,8,9,0};
+- ((McastService)channel.getMembershipService()).setDomain(domain);
+- DomainFilterInterceptor filter = new DomainFilterInterceptor();
+- filter.setDomain(domain);
+- channel.addInterceptor(filter);
+- return channel;
+- }
+-
+-}
+\ No newline at end of file
+Index: test/org/apache/catalina/tribes/demos/CoordinationDemo.java
+===================================================================
+--- test/org/apache/catalina/tribes/demos/CoordinationDemo.java (revision 590752)
++++ test/org/apache/catalina/tribes/demos/CoordinationDemo.java (working copy)
+@@ -1,364 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one or more
+- * contributor license agreements. See the NOTICE file distributed with
+- * this work for additional information regarding copyright ownership.
+- * The ASF licenses this file to You under the Apache License, Version 2.0
+- * (the "License"); you may not use this file except in compliance with
+- * the License. You may obtain a copy of the License at
+- *
+- *
http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.catalina.tribes.demos;
+-
+-import java.io.BufferedReader;
+-import java.io.IOException;
+-import java.io.InputStreamReader;
+-import java.util.StringTokenizer;
+-
+-import org.apache.catalina.tribes.ChannelInterceptor;
+-import org.apache.catalina.tribes.ChannelInterceptor.InterceptorEvent;
+-import org.apache.catalina.tribes.Member;
+-import org.apache.catalina.tribes.group.GroupChannel;
+-import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+-import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
+-import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+-import org.apache.catalina.tribes.transport.ReceiverBase;
+-import org.apache.catalina.tribes.util.Arrays;
+-
+-
+-
+-public class CoordinationDemo {
+- static int CHANNEL_COUNT = 5;
+- static int SCREEN_WIDTH = 120;
+- static long SLEEP_TIME = 10;
+- static int CLEAR_SCREEN = 30;
+- static boolean MULTI_THREAD = false;
+- static boolean[] VIEW_EVENTS = new boolean[255];
+- StringBuffer statusLine = new StringBuffer();
+- Status[] status = null;
+- BufferedReader reader = null;
+- /**
+- * Construct and show the application.
+- */
+- public CoordinationDemo() {
+- }
+-
+- public void init() {
+- reader = new BufferedReader(new InputStreamReader(System.in));
+- status = new Status[CHANNEL_COUNT];
+- }
+-
+-
+- public void clearScreen() {
+- StringBuffer buf = new StringBuffer(700);
+- for (int i=0; i<CLEAR_SCREEN; i++ ) buf.append("\n");
+- System.out.println(buf);
+- }
+-
+- public void printMenuOptions() {
+- System.out.println("Commands:");
+- System.out.println("\tstart [member id]");
+- System.out.println("\tstop [member id]");
+- System.out.println("\tprint (refresh)");
+- System.out.println("\tquit");
+- System.out.print("Enter command:");
+- }
+-
+- public synchronized void printScreen() {
+- clearScreen();
+- System.out.println(" ###."+getHeader());
+- for ( int i=0; i<status.length; i++ ) {
+- System.out.print(leftfill(String.valueOf(i+1)+".",5,"
"));
+- if ( status[i] != null ) System.out.print(status[i].getStatusLine());
+- }
+- System.out.println("\n\n");
+- System.out.println("Overall status:"+statusLine);
+- printMenuOptions();
+-
+- }
+-
+- public String getHeader() {
+- //member - 30
+- //running- 10
+- //coord - 30
+- //view-id - 24
+- //view count - 8
+-
+- StringBuffer buf = new StringBuffer();
+- buf.append(leftfill("Member",30," "));
+- buf.append(leftfill("Running",10," "));
+- buf.append(leftfill("Coord",30," "));
+- buf.append(leftfill("View-id(short)",24," "));
+- buf.append(leftfill("Count",8," "));
+- buf.append("\n");
+-
+- buf.append(rightfill("==="+new
java.sql.Timestamp(System.currentTimeMillis()).toString(),SCREEN_WIDTH,"="));
+- buf.append("\n");
+- return buf.toString();
+- }
+-
+- public String[] tokenize(String line) {
+- StringTokenizer tz = new StringTokenizer(line," ");
+- String[] result = new String[tz.countTokens()];
+- for (int i=0; i<result.length; i++ ) result[i] = tz.nextToken();
+- return result;
+- }
+-
+- public void waitForInput() throws IOException {
+- for ( int i=0; i<status.length; i++ ) status[i] = new Status(this);
+- printScreen();
+- String l = reader.readLine();
+- String[] args = tokenize(l);
+- while ( args.length >= 1 &&
(!"quit".equalsIgnoreCase(args[0]))) {
+- if ("start".equalsIgnoreCase(args[0])) {
+- cmdStart(args);
+- } else if ("stop".equalsIgnoreCase(args[0])) {
+- cmdStop(args);
+-
+- }
+- printScreen();
+- l = reader.readLine();
+- args = tokenize(l);
+- }
+- for ( int i=0; i<status.length; i++ ) status[i].stop();
+- }
+-
+- private void cmdStop(String[] args) {
+- if ( args.length == 1 ) {
+- setSystemStatus("System shutting down...");
+- Thread[] t = new Thread[CHANNEL_COUNT];
+- for (int i = 0; i < status.length; i++) {
+- final int j = i;
+- t[j] = new Thread() {
+- public void run() {
+- status[j].stop();
+- }
+- };
+- }
+- for (int i = 0; i < status.length; i++) if (MULTI_THREAD ) t[i].start();
else t[i].run();
+- setSystemStatus("System stopped.");
+- } else {
+- int index = -1;
+- try { index = Integer.parseInt(args[1])-1;}catch ( Exception x )
{setSystemStatus("Invalid index:"+args[1]);}
+- if ( index >= 0 ) {
+- setSystemStatus("Stopping member:"+(index+1));
+- status[index].stop();
+- setSystemStatus("Member stopped:"+(index+1));
+- }
+- }
+- }
+-
+- private void cmdStart(String[] args) {
+- if ( args.length == 1 ) {
+- setSystemStatus("System starting up...");
+- Thread[] t = new Thread[CHANNEL_COUNT];
+- for (int i = 0; i < status.length; i++) {
+- final int j = i;
+- t[j] = new Thread() {
+- public void run() {
+- status[j].start();
+- }
+- };
+- }
+- for (int i = 0; i < status.length; i++) if (MULTI_THREAD ) t[i].start();
else t[i].run();
+- setSystemStatus("System started.");
+- } else {
+- int index = -1;
+- try { index = Integer.parseInt(args[1])-1;}catch ( Exception x )
{setSystemStatus("Invalid index:"+args[1]);}
+- if ( index >= 0 ) {
+- setSystemStatus("Starting member:"+(index+1));
+- status[index].start();
+- setSystemStatus("Member started:"+(index+1));
+- }
+- }
+- }
+-
+- public void setSystemStatus(String status) {
+- statusLine.delete(0,statusLine.length());
+- statusLine.append(status);
+- }
+-
+-
+-
+- public static void setEvents(String events) {
+- java.util.Arrays.fill(VIEW_EVENTS,false);
+- StringTokenizer t = new StringTokenizer(events,",");
+- while (t.hasMoreTokens() ) {
+- int idx = Integer.parseInt(t.nextToken());
+- VIEW_EVENTS[idx] = true;
+- }
+- }
+-
+- public static void run(String[] args,CoordinationDemo demo) throws Exception {
+- usage();
+- java.util.Arrays.fill(VIEW_EVENTS,true);
+-
+- for (int i=0; i<args.length; i++ ) {
+- if ( "-c".equals(args[i]) )
+- CHANNEL_COUNT = Integer.parseInt(args[++i]);
+- else if ( "-t".equals(args[i]) )
+- MULTI_THREAD = Boolean.parseBoolean(args[++i]);
+- else if ( "-s".equals(args[i]) )
+- SLEEP_TIME = Long.parseLong(args[++i]);
+- else if ( "-sc".equals(args[i]) )
+- CLEAR_SCREEN = Integer.parseInt(args[++i]);
+- else if ( "-p".equals(args[i]) )
+- setEvents(args[++i]);
+- else if ( "-h".equals(args[i]) ) System.exit(0);
+- }
+- demo.init();
+- demo.waitForInput();
+- }
+-
+- private static void usage() {
+- System.out.println("Usage:");
+- System.out.println("\tjava
org.apache.catalina.tribes.demos.CoordinationDemo -c channel-count(int) -t
multi-thread(true|false) -s sleep-time(ms) -sc clear-screen(int) -p
view_events_csv(1,2,5,7)");
+- System.out.println("Example:");
+- System.out.println("\tjava o.a.c.t.d.CoordinationDemo -> starts demo
single threaded start/stop with 5 channels");
+- System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 10 -> starts
demo single threaded start/stop with 10 channels");
+- System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 7 -t true -s 1000
-sc 50-> starts demo multi threaded start/stop with 7 channels and 1 second sleep time
between events and 50 lines to clear screen");
+- System.out.println("\tjava o.a.c.t.d.CoordinationDemo -t true -p 12 ->
starts demo multi threaded start/stop with 5 channels and only prints the EVT_CONF_RX
event");
+- System.out.println();
+- }
+- public static void main(String[] args) throws Exception {
+- CoordinationDemo demo = new CoordinationDemo();
+- run(args,demo);
+- }
+-
+- public static String leftfill(String value, int length, String ch) {
+- return fill(value,length,ch,true);
+- }
+-
+- public static String rightfill(String value, int length, String ch) {
+- return fill(value,length,ch,false);
+- }
+-
+- public static String fill(String value, int length, String ch, boolean left) {
+- StringBuffer buf = new StringBuffer();
+- if ( !left ) buf.append(value.trim());
+- for (int i=value.trim().length(); i<length; i++ ) buf.append(ch);
+- if ( left ) buf.append(value.trim());
+- return buf.toString();
+- }
+-
+-
+- public static class Status {
+- public CoordinationDemo parent;
+- public GroupChannel channel;
+- NonBlockingCoordinator interceptor = null;
+- public String status;
+- public Exception error;
+- public String startstatus = "new";
+-
+- public Status(CoordinationDemo parent) {
+- this.parent = parent;
+- }
+-
+- public String getStatusLine() {
+- //member - 30
+- //running- 10
+- //coord - 30
+- //view-id - 24
+- //view count - 8
+- StringBuffer buf = new StringBuffer();
+- String local = "";
+- String coord = "";
+- String viewId = "";
+- String count = "0";
+- if ( channel != null ) {
+- Member lm = channel.getLocalMember(false);
+- local = lm!=null?lm.getName():"";
+- coord = interceptor!=null &&
interceptor.getCoordinator()!=null?interceptor.getCoordinator().getName():"";
+- viewId =
getByteString(interceptor.getViewId()!=null?interceptor.getViewId().getBytes():new
byte[0]);
+- count = String.valueOf(interceptor.getView().length);
+- }
+- buf.append(leftfill(local,30," "));
+- buf.append(leftfill(startstatus, 10, " "));
+- buf.append(leftfill(coord, 30, " "));
+- buf.append(leftfill(viewId, 24, " "));
+- buf.append(leftfill(count, 8, " "));
+- buf.append("\n");
+- buf.append("Status:"+status);
+- buf.append("\n");
+- return buf.toString();
+- }
+-
+- public String getByteString(byte[] b) {
+- if ( b == null ) return "{}";
+- return Arrays.toString(b,0,Math.min(b.length,4));
+- }
+-
+- public void start() {
+- try {
+- if ( channel == null ) {
+- channel = createChannel();
+- startstatus = "starting";
+- channel.start(channel.DEFAULT);
+- startstatus = "running";
+- } else {
+- status = "Channel already started.";
+- }
+- } catch ( Exception x ) {
+- synchronized (System.err) {
+- System.err.println("Start failed:");
+- StackTraceElement[] els = x.getStackTrace();
+- for (int i = 0; i < els.length; i++)
System.err.println(els[i].toString());
+- }
+- status = "Start failed:"+x.getMessage();
+- error = x;
+- startstatus = "failed";
+- try { channel.stop(GroupChannel.DEFAULT);}catch(Exception ignore){}
+- channel = null;
+- interceptor = null;
+- }
+- }
+-
+- public void stop() {
+- try {
+- if ( channel != null ) {
+- channel.stop(channel.DEFAULT);
+- status = "Channel Stopped";
+- } else {
+- status = "Channel Already Stopped";
+- }
+- }catch ( Exception x ) {
+- synchronized (System.err) {
+- System.err.println("Stop failed:");
+- StackTraceElement[] els = x.getStackTrace();
+- for (int i = 0; i < els.length; i++)
System.err.println(els[i].toString());
+- }
+-
+- status = "Stop failed:"+x.getMessage();
+- error = x;
+- }finally {
+- startstatus = "stopped";
+- channel = null;
+- interceptor = null;
+- }
+- }
+-
+- public GroupChannel createChannel() {
+- channel = new GroupChannel();
+- ((ReceiverBase)channel.getChannelReceiver()).setAutoBind(100);
+- interceptor = new NonBlockingCoordinator() {
+- public void fireInterceptorEvent(InterceptorEvent event) {
+- status = event.getEventTypeDesc();
+- int type = event.getEventType();
+- boolean display = VIEW_EVENTS[type];
+- if ( display ) parent.printScreen();
+- try { Thread.sleep(SLEEP_TIME); }catch ( Exception x){}
+- }
+- };
+- channel.addInterceptor(interceptor);
+- channel.addInterceptor(new TcpFailureDetector());
+- channel.addInterceptor(new MessageDispatch15Interceptor());
+- return channel;
+- }
+- }
+-}
+\ No newline at end of file