[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...
Ron Sigal
ron_sigal at yahoo.com
Wed Dec 13 20:03:55 EST 2006
User: rsigal
Date: 06/12/13 20:03:55
Added: src/main/org/jboss/remoting/transport/bisocket Tag:
remoting_2_x BisocketServerInvoker.java
BisocketClientInvoker.java
TransportClientFactory.java
TransportServerFactory.java Bisocket.java
Log:
JBREM-650: Introducing bisocket transport.
Revision Changes Path
No revision
No revision
1.1.2.1 +542 -0 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/Attic/BisocketServerInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BisocketServerInvoker.java
===================================================================
RCS file: BisocketServerInvoker.java
diff -N BisocketServerInvoker.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ BisocketServerInvoker.java 14 Dec 2006 01:03:55 -0000 1.1.2.1
@@ -0,0 +1,542 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+package org.jboss.remoting.transport.bisocket;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.ServerInvocationHandler;
+import org.jboss.remoting.invocation.InternalInvocation;
+import org.jboss.remoting.transport.PortUtil;
+import org.jboss.remoting.transport.socket.ServerThread;
+import org.jboss.remoting.transport.socket.SocketServerInvoker;
+import org.jboss.remoting.util.TimerUtil;
+
+
+/**
+ *
+ * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
+ * @version $Revision: 1.1.2.1 $
+ * <p>
+ * Copyright Nov 23, 2006
+ * </p>
+ */
+public class BisocketServerInvoker extends SocketServerInvoker
+{
+ private static Map listenerIdToServerInvokerMap = Collections.synchronizedMap(new HashMap());
+
+ private Map listenerIdToInvokerLocatorMap = new HashMap();
+ private ServerSocket secondaryServerSocket;
+ private InvokerLocator secondaryLocator;
+ private SecondaryServerSocketThread secondaryServerSocketThread;
+ private Map controlConnectionThreadMap = new HashMap();
+ private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;
+ private int pingWindow = 2 * pingFrequency;
+ private ControlMonitorTimerTask controlMonitorTimerTask;
+
+
+ public static BisocketServerInvoker getBisocketServerInvoker(String listenerId)
+ {
+ return (BisocketServerInvoker) listenerIdToServerInvokerMap.get(listenerId);
+ }
+
+
+// public static void addBisocketServerInvoker(String listenerId, BisocketServerInvoker invoker)
+// {
+// listenerIdToServerInvokerMap.put(listenerId, invoker);
+// }
+
+
+ public BisocketServerInvoker(InvokerLocator locator)
+ {
+ super(locator);
+ }
+
+
+ public BisocketServerInvoker(InvokerLocator locator, Map configuration)
+ {
+ super(locator, configuration);
+ }
+
+
+ public void start() throws IOException
+ {
+ super.start();
+
+ boolean startSecondaryPort = false;
+ Object o = configuration.get(Bisocket.START_SECONDARY_PORT);
+
+ if (o != null)
+ {
+ if (o instanceof String)
+ startSecondaryPort = Boolean.valueOf((String) o).booleanValue();
+ else if (o instanceof Boolean)
+ startSecondaryPort = ((Boolean) o).booleanValue();
+ else
+ log.error("unrecognized value for configuration key \"" +
+ Bisocket.START_SECONDARY_PORT + "\": " + o);
+
+ if (startSecondaryPort)
+ {
+ InetAddress host = getServerSocket().getInetAddress();
+ int freePort = PortUtil.findFreePort(host.getHostAddress());
+ secondaryServerSocket = new ServerSocket(freePort, 0, host);
+ secondaryLocator = new InvokerLocator(null, host.getHostAddress(), freePort, null, null);
+ secondaryServerSocketThread = new SecondaryServerSocketThread(secondaryServerSocket);
+ secondaryServerSocketThread.setName("secondaryServerSocketThread");
+ secondaryServerSocketThread.setDaemon(true);
+ secondaryServerSocketThread.start();
+ log.info("started secondary port: " + host + ":" + freePort);
+ }
+ }
+// ServerInvocationHandler handler = new InternalInvocationHandler(secondaryServerSocket);
+// addInvocationHandler(Bisocket.BISOCKET_INTERNAL_SUBSYSTEM, handler);
+ }
+
+
+ public boolean isTransportBiDirectional()
+ {
+ return true;
+ }
+
+
+ public void createControlConnection(String listenerId, InvokerLocator locator)
+ throws IOException
+ {
+ // first connection
+ if (locator == null)
+ locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
+ // restarted connection
+ else
+ listenerIdToInvokerLocatorMap.put(listenerId, locator);
+
+ log.info("creating control connection: " + locator);
+ Socket socket = new Socket(locator.getHost(), locator.getPort());
+ DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
+ dos.write(Bisocket.CREATE_ORDINARY_SOCKET);
+ dos.writeUTF(listenerId);
+ Thread thread = new ControlConnectionThread(socket, listenerId);
+ thread.setName("control: " + socket.toString());
+ thread.setDaemon(true);
+
+ synchronized (controlConnectionThreadMap)
+ {
+ controlConnectionThreadMap.put(listenerId, thread);
+ }
+
+ thread.start();
+ log.info("created control connection: " + locator);
+
+ if (controlMonitorTimerTask == null)
+ {
+ controlMonitorTimerTask = new ControlMonitorTimerTask();
+ TimerUtil.schedule(controlMonitorTimerTask, pingFrequency);
+ log.info("scheduled ControlMonitorTimerTask: " + controlMonitorTimerTask);
+ }
+ }
+
+
+ public int getPingFrequency()
+ {
+ return pingFrequency;
+ }
+
+
+ public void setPingFrequency(int pingFrequency)
+ {
+ this.pingFrequency = pingFrequency;
+ pingWindow = 2 * pingFrequency;
+ log.info("set ping frequency: " + pingFrequency);
+ }
+
+
+ protected void cleanup()
+ {
+ super.cleanup();
+
+ if (controlMonitorTimerTask != null)
+ {
+ controlMonitorTimerTask.shutdown();
+ }
+ Iterator it = controlConnectionThreadMap.values().iterator();
+ while (it.hasNext())
+ {
+ ControlConnectionThread t = (ControlConnectionThread) it.next();
+ t.shutdown();
+ it.remove();
+ }
+ if (secondaryServerSocketThread != null)
+ {
+ secondaryServerSocketThread.shutdown();
+ }
+ if (secondaryServerSocket != null)
+ {
+ try
+ {
+ secondaryServerSocket.close();
+ log.info("closed secondary port: " +
+ secondaryServerSocket.getInetAddress() + ":" +
+ secondaryServerSocket.getLocalPort());
+ }
+ catch (IOException e)
+ {
+ log.info("Error closing secondary server socket: " + e.getMessage());
+ }
+ }
+ }
+
+
+ protected InvokerLocator getSecondaryLocator()
+ {
+ return secondaryLocator;
+ }
+
+ protected ServerSocket getServerSocket()
+ {
+ return serverSocket;
+ }
+
+
+ protected Object handleInternalInvocation(InternalInvocation ii,
+ InvocationRequest ir,
+ ServerInvocationHandler handler)
+ throws Throwable
+ {
+ if(Bisocket.GET_SECONDARY_INVOKER_LOCATOR.equals(ii.getMethodName()))
+ {
+ log.info("returning secondaryLocator: " + secondaryLocator);
+ return secondaryLocator;
+ }
+
+ Object response = super.handleInternalInvocation(ii, ir, handler);
+
+ if(InternalInvocation.ADDCLIENTLISTENER.equals(ii.getMethodName()))
+ {
+ Map metadata = ir.getRequestPayload();
+ if(metadata != null)
+ {
+ String listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
+ if (listenerId != null)
+ {
+ listenerIdToServerInvokerMap.put(listenerId, this);
+ log.info("registered " + listenerId + " -> " + this);
+ }
+ }
+ }
+
+ return response;
+ }
+
+
+ class ControlConnectionThread extends Thread
+ {
+ private Socket controlSocket;
+ private String listenerId;
+ private DataInputStream dis;
+ private boolean running;
+ private int errorCount;
+ private long lastPing = System.currentTimeMillis();
+
+ ControlConnectionThread(Socket socket, String listenerId) throws IOException
+ {
+ controlSocket = socket;
+ this.listenerId = listenerId;
+ dis = new DataInputStream(socket.getInputStream());
+ }
+
+ void shutdown()
+ {
+ running = false;
+ synchronized (controlConnectionThreadMap)
+ {
+ controlConnectionThreadMap.remove(this);
+ }
+ try
+ {
+ controlSocket.close();
+ }
+ catch (IOException e)
+ {
+ log.warn("unable to close controlSocket");
+ }
+ interrupt();
+ }
+
+ boolean checkConnection()
+ {
+ long currentTime = System.currentTimeMillis();
+ log.info("elapsed: " + (currentTime - lastPing));
+ log.info("returning: " + ((currentTime - lastPing > pingWindow) ? false : true));
+ return (currentTime - lastPing <= pingWindow);
+ }
+
+ String getListenerId()
+ {
+ return listenerId;
+ }
+
+ public void run()
+ {
+ log.info("starting ControlConnectionThread");
+ running = true;
+ while (running)
+ {
+ Socket socket = null;
+
+ try
+ {
+ int action = dis.read();
+ lastPing = System.currentTimeMillis();
+
+ switch (action)
+ {
+ case Bisocket.CREATE_ORDINARY_SOCKET:
+// String listenerId = dis.readUTF();
+// dis.readUTF();
+ InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
+ socket = new Socket(locator.getHost(), locator.getPort());
+ DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
+ dos.write(Bisocket.CREATE_ORDINARY_SOCKET);
+ dos.writeUTF(listenerId);
+ break;
+
+ case Bisocket.PING:
+ log.info("got ping");
+// log.info("lastPing: " + lastPing);
+ continue;
+
+ case -1:
+ shutdown();
+ return;
+
+ default:
+ log.error("unrecognized action: " + action);
+ continue;
+ }
+ }
+ catch (IOException e)
+ {
+ if (running)
+ {
+ if ("socket closed".equals(e.getMessage()))
+ {
+ shutdown();
+ return;
+ }
+ log.error("Unable to create new Socket: " + e.getMessage());
+ e.printStackTrace();
+ if (++errorCount > 5)
+ {
+ shutdown();
+ return;
+ }
+ continue;
+ }
+
+ return;
+ }
+
+ synchronized (clientpool)
+ {
+ if(clientpool.size() < maxPoolSize)
+ {
+ Thread thread = null;
+ try
+ {
+ thread = new ServerThread(socket, BisocketServerInvoker.this,
+ clientpool, threadpool,
+ getTimeout(), serverSocketClass);
+ thread.start();
+ log.info("created: " + thread);
+ }
+ catch (Exception e)
+ {
+ log.error("Unable to create new ServerThread: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ synchronized (threadpool)
+ {
+ threadpool.add(thread);
+ }
+ }
+ }
+ }
+ }
+ }
+
+
+// protected ServerThread createServerThread(Socket socket) throws Exception
+// {
+// return new BisocketServerThread(socket, this, clientpool, threadpool, getTimeout(), serverSocketClass);
+// }
+
+
+ class SecondaryServerSocketThread extends Thread
+ {
+ private ServerSocket secondaryServerSocket;
+ boolean running;
+
+ SecondaryServerSocketThread(ServerSocket secondaryServerSocket) throws IOException
+ {
+ this.secondaryServerSocket = secondaryServerSocket;
+ }
+
+ void shutdown()
+ {
+ running = false;
+ interrupt();
+ }
+
+ public void run()
+ {
+ running = true;
+ while (running)
+ {
+ try
+ {
+ Socket socket = secondaryServerSocket.accept();
+ DataInputStream dis = new DataInputStream(socket.getInputStream());
+ int action = dis.read();
+ String listenerId = dis.readUTF();
+
+ switch (action)
+ {
+ case Bisocket.CREATE_CONTROL_SOCKET:
+ BisocketClientInvoker invoker;
+ invoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
+ invoker.replaceControlSocket(socket);
+ log.info("SecondaryServerSocketThread: created secondary socket: " + listenerId);
+ break;
+
+ case Bisocket.CREATE_ORDINARY_SOCKET:
+ BisocketClientInvoker.transferSocket(listenerId, socket);
+ log.info("SecondaryServerSocketThread: transferred socket: " + listenerId);
+ break;
+
+ default:
+ log.error("unrecognized action: " + action);
+ }
+ }
+ catch (IOException e)
+ {
+ if (running)
+ log.error("Failed to accept socket connection", e);
+ else
+ return;
+
+ }
+ }
+ }
+
+ ServerSocket getServerSocket()
+ {
+ return secondaryServerSocket;
+ }
+ }
+
+
+ class ControlMonitorTimerTask extends TimerTask
+ {
+ void shutdown()
+ {
+ running = false;
+ cancel();
+ log.info("shutting down " + this);
+ }
+
+ public void run()
+ {
+ Collection controlConnectionThreads = null;
+ synchronized (controlConnectionThreadMap)
+ {
+ controlConnectionThreads = new HashSet(controlConnectionThreadMap.values());
+ }
+
+ Iterator it = controlConnectionThreads.iterator();
+ while (it.hasNext() & running)
+ {
+ ControlConnectionThread t = (ControlConnectionThread) it.next();
+ if (!t.checkConnection())
+ {
+ log.info("detected failure on control connection: requesting new control connection");
+ t.shutdown();
+ it.remove();
+
+ if (!running)
+ return;
+
+ try
+ {
+ createControlConnection(t.getListenerId(), null);
+ }
+ catch (IOException e)
+ {
+ InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(t.getListenerId());
+ log.error(this + ": " + "Unable to recreate control connection: " + locator, e);
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+
+// class InternalInvocationHandler implements ServerInvocationHandler
+// {
+// InvokerLocator locator;
+//
+// InternalInvocationHandler(ServerSocket ss)
+// {
+// String host = ss.getInetAddress().getHostAddress();
+// int port = ss.getLocalPort();
+// locator = new InvokerLocator(null, host, port, null, null);
+// }
+//
+// public Object invoke(InvocationRequest invocation) throws Throwable
+// {
+// return locator;
+// }
+//
+// public void setMBeanServer(MBeanServer server) {}
+// public void setInvoker(ServerInvoker invoker) {}
+// public void addListener(InvokerCallbackHandler callbackHandler) {}
+// public void removeListener(InvokerCallbackHandler callbackHandler) {}
+// }
+}
\ No newline at end of file
1.1.2.1 +404 -0 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/Attic/BisocketClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BisocketClientInvoker.java
===================================================================
RCS file: BisocketClientInvoker.java
diff -N BisocketClientInvoker.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ BisocketClientInvoker.java 14 Dec 2006 01:03:55 -0000 1.1.2.1
@@ -0,0 +1,404 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+
+package org.jboss.remoting.transport.bisocket;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimerTask;
+
+import org.jboss.logging.Logger;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.ConnectionFailedException;
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.invocation.InternalInvocation;
+import org.jboss.remoting.marshal.Marshaller;
+import org.jboss.remoting.marshal.UnMarshaller;
+import org.jboss.remoting.transport.BidirectionalClientInvoker;
+import org.jboss.remoting.transport.socket.SocketClientInvoker;
+import org.jboss.remoting.util.TimerUtil;
+
+/**
+ * BisocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker,
+ * which must be a BisocketServerInvoker.
+ *
+ * @author <a href="mailto:ron.sigal at jboss.com">Ron Sigal</a>
+ */
+public class BisocketClientInvoker
+extends SocketClientInvoker
+implements BidirectionalClientInvoker
+{
+ private static final Logger log = Logger.getLogger(BisocketClientInvoker.class);
+ private static Map listenerIdToClientInvokerMap = Collections.synchronizedMap(new HashMap());
+ private static Map listenerIdToSocketsMap = new HashMap();
+
+ private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;;
+ private String listenerId;
+ private InvokerLocator secondaryLocator;
+ private Socket controlSocket;
+ private OutputStream controlOutputStream;
+ private PingTimerTask pingTimerTask;
+
+
+ static BisocketClientInvoker getBisocketClientInvoker(String listenerId)
+ {
+ return (BisocketClientInvoker) listenerIdToClientInvokerMap.get(listenerId);
+ }
+
+
+ static void transferSocket(String listenerId, Socket socket)
+ {
+ Set sockets = null;
+
+ synchronized (listenerIdToSocketsMap)
+ {
+ sockets = (Set) listenerIdToSocketsMap.get(listenerId);
+ if (sockets == null)
+ {
+ sockets = new HashSet();
+ listenerIdToSocketsMap.put(listenerId, sockets);
+ }
+ }
+
+ synchronized (sockets)
+ {
+ sockets.add(socket);
+ sockets.notify();
+ }
+ }
+
+
+ public BisocketClientInvoker(InvokerLocator locator) throws IOException
+ {
+ this(locator, null);
+ }
+
+
+ public BisocketClientInvoker(InvokerLocator locator, Map config) throws IOException
+ {
+ super(locator, config);
+
+ if (config != null)
+ {
+ listenerId = (String) config.get(Client.LISTENER_ID_KEY);
+ if (listenerId != null)
+ {
+ listenerIdToClientInvokerMap.put(listenerId, this);
+
+ synchronized (listenerIdToSocketsMap)
+ {
+ if (listenerIdToSocketsMap.get(listenerId) == null)
+ listenerIdToSocketsMap.put(listenerId, new HashSet());
+ }
+
+ log.info("registered " + listenerId + " -> " + this);
+ }
+
+ // look for socketTimeout param
+ Object val = config.get(Bisocket.PING_FREQUENCY);
+ if (val != null)
+ {
+ try
+ {
+ int nVal = Integer.valueOf((String) val).intValue();
+ pingFrequency = nVal;
+ log.debug("Setting ping frequency to: " + pingFrequency);
+ }
+ catch (Exception e)
+ {
+ log.warn("Could not convert " + Bisocket.PING_FREQUENCY +
+ " value of " + val + " to an int value.");
+ }
+ }
+ }
+ }
+
+
+ public int getPingFrequency()
+ {
+ return pingFrequency;
+ }
+
+
+ public void setPingFrequency(int pingFrequency)
+ {
+ this.pingFrequency = pingFrequency;
+ log.info("set ping frequency: " + pingFrequency);
+ }
+
+
+ protected void handleConnect() throws ConnectionFailedException
+ {
+ super.handleConnect();
+
+ // Callback client on server side.
+ if (listenerId != null)
+ {
+// pingTimerTask = new PingTimerTask();
+// TimerUtil.schedule(pingTimerTask, pingFrequency);
+// log.info("scheduled PingTimerTask");
+ return;
+ }
+
+ // Client on client side.
+ try
+ {
+ InternalInvocation ii = new InternalInvocation(Bisocket.GET_SECONDARY_INVOKER_LOCATOR, null);
+ InvocationRequest r = new InvocationRequest(null, null, ii, null, null, null);
+// secondaryLocator = (InvokerLocator) invoke(r);
+ Object o = invoke(r);
+ log.info("secondary locator: " + o);
+ secondaryLocator = (InvokerLocator) o;
+ log.info("got secondary locator: " + secondaryLocator);
+ }
+ catch (Throwable e)
+ {
+ log.error("Unable to retrieve address/port of secondary server socket", e);
+ throw new ConnectionFailedException(e.getMessage());
+ }
+ }
+
+
+ protected void handleDisconnect()
+ {
+ super.handleDisconnect();
+ if (listenerId != null)
+ {
+ listenerIdToClientInvokerMap.remove(listenerId);
+ listenerIdToSocketsMap.remove(listenerId);
+ if (pingTimerTask != null)
+ pingTimerTask.shutDown();
+ }
+ }
+
+
+ protected Object transport(String sessionId, Object invocation, Map metadata,
+ Marshaller marshaller, UnMarshaller unmarshaller)
+ throws IOException, ConnectionFailedException, ClassNotFoundException
+ {
+ if (invocation instanceof InvocationRequest)
+ {
+ InvocationRequest ir = (InvocationRequest) invocation;
+ Object o = ir.getParameter();
+ if (o instanceof InternalInvocation)
+ {
+ InternalInvocation ii = (InternalInvocation) o;
+ if (InternalInvocation.ADDLISTENER.equals(ii.getMethodName()))
+ {
+ Map requestPayload = ir.getRequestPayload();
+ String listenerId = (String) requestPayload.get(Client.LISTENER_ID_KEY);
+ BisocketServerInvoker callbackServerInvoker;
+ callbackServerInvoker = BisocketServerInvoker.getBisocketServerInvoker(listenerId);
+ callbackServerInvoker.createControlConnection(listenerId, secondaryLocator);
+ }
+ }
+ }
+
+ return super.transport(sessionId, invocation, metadata, marshaller, unmarshaller);
+ }
+
+
+ protected Socket createSocket(String address, int port) throws IOException
+ {
+ if (listenerId == null)
+ return super.createSocket(address, port);
+
+ Socket socket = null;
+ Set sockets = null;
+
+ synchronized (listenerIdToSocketsMap)
+ {
+ sockets = (Set) listenerIdToSocketsMap.get(listenerId);
+ }
+
+ synchronized (sockets)
+ {
+ Iterator it = sockets.iterator();
+ if (it.hasNext())
+ {
+ socket = (Socket) it.next();
+ it.remove();
+
+ if (controlSocket != null)
+ return socket;
+ else
+ {
+ controlSocket = socket;
+ controlOutputStream = controlSocket.getOutputStream();
+ log.info("got control socket");
+ pingTimerTask = new PingTimerTask();
+ TimerUtil.schedule(pingTimerTask, pingFrequency);
+ log.info("scheduled PingTimerTask");
+
+ if (it.hasNext())
+ {
+ socket = (Socket) it.next();
+ it.remove();
+ return socket;
+ }
+ }
+ }
+
+ if (controlSocket == null)
+ {
+ while (true)
+ {
+ try
+ {
+ sockets.wait(getTimeout());
+ if (sockets.isEmpty())
+ throw new IOException("Timed out trying to create control socket");
+ break;
+ }
+ catch (InterruptedException ignored) {}
+ }
+
+ it = sockets.iterator();
+ controlSocket = (Socket) it.next();
+ controlOutputStream = controlSocket.getOutputStream();
+ it.remove();
+ log.info("got control socket");
+ pingTimerTask = new PingTimerTask();
+ TimerUtil.schedule(pingTimerTask, pingFrequency);
+ log.info("scheduled PingTimerTask");
+ }
+
+ log.info("requesting socket");
+
+ synchronized (controlOutputStream)
+ {
+ controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
+ }
+// controlOutputStream.writeUTF(listenerId);
+
+ while (true)
+ {
+ try
+ {
+ sockets.wait(getTimeout());
+ if (sockets.isEmpty())
+ throw new IOException("Timed out trying to create socket");
+ break;
+ }
+ catch (InterruptedException ignored) {}
+ }
+
+ log.info("got socket");
+ it = sockets.iterator();
+ socket = (Socket) it.next();
+ it.remove();
+ return socket;
+ }
+ }
+
+
+ void replaceControlSocket(Socket socket) throws IOException
+ {
+ synchronized (controlSocket)
+ {
+ controlSocket = socket;
+ }
+ controlOutputStream = controlSocket.getOutputStream();
+
+ if (pingTimerTask != null)
+ pingTimerTask.cancel();
+
+ pingTimerTask = new PingTimerTask();
+ TimerUtil.schedule(pingTimerTask, pingFrequency);
+ log.info("replaced PingTimerTask");
+ }
+
+
+ protected Object checkType(Object o, Class c) throws IOException
+ {
+ if (c.isInstance(o))
+ return o;
+
+ throw new IOException(o + "is not an instance of " + c);
+ }
+
+ protected Object checkNull(Object o, String field) throws IOException
+ {
+ if (o == null)
+ throw new IOException(field + " must not be null");
+
+ return o;
+ }
+
+
+ public InvokerLocator getCallbackLocator(Map metadata)
+ {
+ String transport = (String) metadata.get(Client.CALLBACK_SERVER_PROTOCOL);
+ String host = (String) metadata.get(Client.CALLBACK_SERVER_HOST);
+ String sPort = (String) metadata.get(Client.CALLBACK_SERVER_PORT);
+ int port = -1;
+ if (sPort != null)
+ {
+ try
+ {
+ port = Integer.parseInt(sPort);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new RuntimeException("Can not set internal callback server port as configuration value (" + sPort + " is not a number.");
+ }
+ }
+
+ return new InvokerLocator(transport, host, port, "callback", metadata);
+ }
+
+
+ class PingTimerTask extends TimerTask
+ {
+ public void shutDown()
+ {
+ cancel();
+ }
+
+ public void run()
+ {
+ synchronized (controlOutputStream)
+ {
+ try
+ {
+ log.info("sending ping");
+ controlOutputStream.write(Bisocket.PING);
+ }
+ catch (IOException e)
+ {
+ log.warn("Unable to send ping: shutting down PingTimerTask");
+ pingTimerTask = null;
+ cancel();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
1.1.2.1 +51 -0 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/Attic/TransportClientFactory.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: TransportClientFactory.java
===================================================================
RCS file: TransportClientFactory.java
diff -N TransportClientFactory.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ TransportClientFactory.java 14 Dec 2006 01:03:55 -0000 1.1.2.1
@@ -0,0 +1,51 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.remoting.transport.bisocket;
+
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.transport.ClientFactory;
+import org.jboss.remoting.transport.ClientInvoker;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ *
+ * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
+ * @version $Revision: 1.1.2.1 $
+ * <p>
+ * Copyright Nov 25, 2006
+ * </p>
+ */
+public class TransportClientFactory implements ClientFactory
+{
+ public ClientInvoker createClientInvoker(InvokerLocator locator, Map config)
+ throws IOException
+ {
+ return new BisocketClientInvoker(locator, config);
+ }
+
+ public boolean supportsSSL()
+ {
+ return false;
+ }
+}
\ No newline at end of file
1.1.2.1 +28 -0 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/Attic/TransportServerFactory.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: TransportServerFactory.java
===================================================================
RCS file: TransportServerFactory.java
diff -N TransportServerFactory.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ TransportServerFactory.java 14 Dec 2006 01:03:55 -0000 1.1.2.1
@@ -0,0 +1,28 @@
+package org.jboss.remoting.transport.bisocket;
+
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.ServerInvoker;
+import org.jboss.remoting.transport.ServerFactory;
+
+import java.util.Map;
+
+/**
+ *
+ * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
+ * @version $Revision: 1.1.2.1 $
+ * <p>
+ * Copyright Nov 25, 2006
+ * </p>
+ */
+public class TransportServerFactory implements ServerFactory
+{
+ public ServerInvoker createServerInvoker(InvokerLocator locator, Map config)
+ {
+ return new BisocketServerInvoker(locator, config);
+ }
+
+ public boolean supportsSSL()
+ {
+ return false;
+ }
+}
1.1.2.1 +45 -0 JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/Attic/Bisocket.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: Bisocket.java
===================================================================
RCS file: Bisocket.java
diff -N Bisocket.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ Bisocket.java 14 Dec 2006 01:03:55 -0000 1.1.2.1
@@ -0,0 +1,45 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.remoting.transport.bisocket;
+
+/**
+ * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
+ * @version $Revision: 1.1.2.1 $
+ * <p>
+ * Copyright Nov 22, 2006
+ * </p>
+ */
+public class Bisocket
+{
+ public static final String BISOCKET_INTERNAL_SUBSYSTEM = "bisocketInternalSystem";
+ public static final String CREATE_CALLBACK_SOCKET = "createCallbackSocket";
+ public static final String GET_SECONDARY_INVOKER_LOCATOR = "getSecondaryInvokerLocator";
+
+ public static final String START_SECONDARY_PORT = "startSecondaryPort";
+
+ public static final byte PING = 1;
+ public static final byte CREATE_CONTROL_SOCKET = 2;
+ public static final byte CREATE_ORDINARY_SOCKET = 3;
+
+ public static final String PING_FREQUENCY = "pingFrequency";
+ public static final int PING_FREQUENCY_DEFAULT = 5000;
+}
More information about the jboss-cvs-commits
mailing list