[jboss-cvs] jboss-seam/src/remoting/org/jboss/seam/remoting/messaging ...

Shane Bryzak sbryzak at redhat.com
Tue Feb 27 17:15:24 EST 2007


  User: sbryzak2
  Date: 07/02/27 17:15:24

  Added:       src/remoting/org/jboss/seam/remoting/messaging       
                        JBossConnectionProvider.java
                        JMSConnectionProvider.java PollError.java
                        PollRequest.java RemoteSubscriber.java
                        SubscriptionRegistry.java SubscriptionRequest.java
  Log:
  JBSEAM-915
  
  Revision  Changes    Path
  1.1      date: 2007/02/27 22:15:24;  author: sbryzak2;  state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/JBossConnectionProvider.java
  
  Index: JBossConnectionProvider.java
  ===================================================================
  package org.jboss.seam.remoting.messaging;
  
  import javax.jms.TopicConnection;
  import javax.jms.TopicConnectionFactory;
  import javax.naming.InitialContext;
  
  /**
   *
   * @author Shane Bryzak
   */
  public class JBossConnectionProvider implements JMSConnectionProvider
  {
    private static final String FACTORY_JNDI_NAME = "UIL2ConnectionFactory";
  
    public TopicConnection createConnection()
      throws Exception
    {
      InitialContext ctx = new InitialContext();
      TopicConnectionFactory f = (TopicConnectionFactory) ctx.lookup(FACTORY_JNDI_NAME);
      return f.createTopicConnection();
    }
  }
  
  
  
  1.1      date: 2007/02/27 22:15:24;  author: sbryzak2;  state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/JMSConnectionProvider.java
  
  Index: JMSConnectionProvider.java
  ===================================================================
  package org.jboss.seam.remoting.messaging;
  
  import javax.jms.TopicConnection;
  
  /**
   *
   * @author Shane Bryzak
   */
  public interface JMSConnectionProvider {
    public TopicConnection createConnection() throws Exception;
  }
  
  
  
  1.1      date: 2007/02/27 22:15:24;  author: sbryzak2;  state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/PollError.java
  
  Index: PollError.java
  ===================================================================
  package org.jboss.seam.remoting.messaging;
  
  /**
   *
   * @author Shane Bryzak
   */
  public class PollError
  {
    public static final String ERROR_CODE_TOKEN_NOT_FOUND = "TOKEN_NOT_FOUND";
    public static final String ERROR_CODE_JMS_EXCEPTION = "JMS_EXCEPTION";
  
    private String code;
    private String message;
  
    public PollError(String code, String message)
    {
      this.code = code;
      this.message = message;
    }
  
    public String getCode()
    {
      return code;
    }
  
    public String getMessage()
    {
      return message;
    }
  }
  
  
  
  1.1      date: 2007/02/27 22:15:24;  author: sbryzak2;  state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/PollRequest.java
  
  Index: PollRequest.java
  ===================================================================
  package org.jboss.seam.remoting.messaging;
  
  import java.util.ArrayList;
  import java.util.List;
  import javax.jms.Message;
  import javax.jms.*;
  
  /**
   * Wrapper for a single request for a specified subscription poll.
   *
   * @author Shane Bryzak
   */
  public class PollRequest
  {
    private String token;
    private int timeout;
    private List<Message> messages;
    private List<PollError> errors = new ArrayList<PollError>();
  
    public PollRequest(String token, int timeout)
    {
      this.token = token;
      this.timeout = timeout;
    }
  
    public String getToken()
    {
      return token;
    }
  
    public List<Message> getMessages()
    {
      return messages;
    }
  
    public List<PollError> getErrors()
    {
      return errors;
    }
  
    public void poll()
    {
      RemoteSubscriber subscriber = SubscriptionRegistry.instance().getSubscription(token);
      if (subscriber != null)
      {
        try
        {
          messages = subscriber.poll(timeout);
        }
        catch (JMSException ex)
        {
          errors.add(new PollError(PollError.ERROR_CODE_JMS_EXCEPTION,
                                   "Error polling for messages"));
        }
      }
      else
        errors.add(new PollError(PollError.ERROR_CODE_TOKEN_NOT_FOUND,
                                 "No subscription was found for the specified token."));
    }
  }
  
  
  
  1.1      date: 2007/02/27 22:15:24;  author: sbryzak2;  state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/RemoteSubscriber.java
  
  Index: RemoteSubscriber.java
  ===================================================================
  package org.jboss.seam.remoting.messaging;
  
  import java.util.ArrayList;
  import java.util.List;
  
  import javax.jms.JMSException;
  import javax.jms.Message;
  import javax.jms.Topic;
  import javax.jms.TopicConnection;
  import javax.jms.TopicSession;
  import javax.jms.TopicSubscriber;
  
  /**
   *
   * @author Shane Bryzak
   */
  public class RemoteSubscriber
  {
    private String token;
    private String topicName;
  
    private Topic topic;
    private TopicSession topicSession;
    private TopicSubscriber subscriber;
  
    public RemoteSubscriber(String token, String topicName)
    {
      this.token = token;
      this.topicName = topicName;
    }
  
    public String getToken()
    {
      return token;
    }
  
    public String getTopicName()
    {
      return topicName;
    }
  
    public void subscribe(TopicConnection conn)
        throws JMSException
    {
      topicSession = conn.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
      topic = topicSession.createTopic(topicName);
      subscriber = topicSession.createSubscriber(topic);
    }
  
    public void unsubscribe()
    {
      try {
        subscriber.close();
  
        // Remove the subscription's token from the user's session context
        SubscriptionRegistry.instance().getUserTokens().remove(token);
      }
      catch (JMSException ex) { }
  
      try {
        topicSession.close();
      }
      catch (JMSException ex) { }
    }
  
    public void setTopicSubscriber(TopicSubscriber subscriber)
    {
      this.subscriber = subscriber;
    }
  
    public TopicSubscriber getTopicSubscriber()
    {
      return subscriber;
    }
  
    public List<Message> poll(int timeout)
        throws JMSException
    {
      List<Message> messages = null;
  
      Message m = null;
  
      synchronized(subscriber)
      {
        do {
            // Only timeout for the first message.. subsequent messages should be nowait
          if (messages == null && timeout > 0)
            m = subscriber.receive(timeout * 1000);
          else
            m = subscriber.receiveNoWait();
  
          if (m != null) {
            if (messages == null)
              messages = new ArrayList<Message> ();
            messages.add(m);
          }
        }
        while (m != null);
      }
  
      return messages;
    }
  }
  
  
  
  1.1      date: 2007/02/27 22:15:24;  author: sbryzak2;  state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/SubscriptionRegistry.java
  
  Index: SubscriptionRegistry.java
  ===================================================================
  package org.jboss.seam.remoting.messaging;
  
  import static org.jboss.seam.InterceptionType.NEVER;
  import static org.jboss.seam.annotations.Install.BUILT_IN;
  
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.Map;
  import java.util.Set;
  import java.util.UUID;
  
  import javax.jms.ExceptionListener;
  import javax.jms.JMSException;
  import javax.jms.TopicConnection;
  
  import org.jboss.seam.log.LogProvider;
  import org.jboss.seam.log.Logging;
  import org.jboss.seam.Component;
  import org.jboss.seam.ScopeType;
  import org.jboss.seam.annotations.Install;
  import org.jboss.seam.annotations.Intercept;
  import org.jboss.seam.annotations.Name;
  import org.jboss.seam.annotations.Scope;
  import org.jboss.seam.contexts.Context;
  import org.jboss.seam.contexts.Contexts;
  import org.jboss.seam.util.Reflections;
  
  /**
   *
   * @author Shane Bryzak
   */
  @Scope(ScopeType.APPLICATION)
  @Intercept(NEVER)
  @Name("org.jboss.seam.remoting.messaging.subscriptionRegistry")
  @Install(value = false, precedence=BUILT_IN)
  public class SubscriptionRegistry
  {
    private static final String DEFAULT_CONNECTION_PROVIDER =
      "org.jboss.seam.remoting.messaging.JBossConnectionProvider";
  
    public static final String CONTEXT_USER_TOKENS =
        "org.jboss.seam.remoting.messaging.SubscriptionRegistry.userTokens";
  
    private static final LogProvider log = Logging.getLogProvider(SubscriptionRegistry.class);
  
    private String connectionProvider;
  
    private volatile TopicConnection topicConnection;
  
    private Object monitor = new Object();
  
    private Map<String,RemoteSubscriber> subscriptions = new HashMap<String,RemoteSubscriber>();
  
    /**
     * Contains a list of all the topics that clients are allowed to subscribe to.
     */
    private Set<String> allowedTopics = new HashSet<String>();
  
    public static SubscriptionRegistry instance()
    {
      SubscriptionRegistry registry = (SubscriptionRegistry) Component.getInstance(SubscriptionRegistry.class, true);
  
      if (registry == null)
      {
        throw new IllegalStateException("No SubscriptionRegistry exists");
      }
  
      return registry;
    }
  
    public Set<String> getAllowedTopics()
    {
      return allowedTopics;
    }
  
    public void setAllowedTopics(Set<String> allowedTopics)
    {
      this.allowedTopics = allowedTopics;
    }
  
    public String getConnectionProvider()
    {
      return connectionProvider;
    }
  
    public void setConnectionProvider(String connectionProvider)
    {
      this.connectionProvider = connectionProvider;
    }
  
    private TopicConnection getTopicConnection()
      throws Exception
    {
      if (topicConnection == null)
      {
        synchronized(monitor)
        {
          if (topicConnection == null)
          {
            String providerName = connectionProvider != null ?
                                      connectionProvider : DEFAULT_CONNECTION_PROVIDER;
            try {
              Class providerClass = Reflections.classForName(providerName);
              JMSConnectionProvider provider = (JMSConnectionProvider) providerClass.newInstance();
              topicConnection = provider.createConnection();
  
              topicConnection.setExceptionListener(new ExceptionListener() {
                public void onException(JMSException ex)
                {
                  // swallow the exception for now - do we need to try and reconnect???
                }
              });
              topicConnection.start();
            }
            catch (ClassNotFoundException ex)
            {
              log.error(String.format("Topic connection provider class [%s] not found",
                                      providerName));
              throw ex;
            }
            catch (InstantiationException ex)
            {
              log.error(String.format("Failed to create connection provider [%s]",
                                      providerName));
              throw ex;
            }
          }
        }
      }
      return topicConnection;
    }
  
    public RemoteSubscriber subscribe(String topicName)
    {
      if (!allowedTopics.contains(topicName))
        throw new IllegalArgumentException(String.format(
          "Cannot subscribe to a topic that is not allowed. Topic [%s] is not an " +
          "allowed topic.", topicName));
  
      RemoteSubscriber sub = new RemoteSubscriber(UUID.randomUUID().toString(), topicName);
  
      try {
        sub.subscribe(getTopicConnection());
        subscriptions.put(sub.getToken(), sub);
  
        // Save the client's token in their session context
        getUserTokens().add(sub.getToken());
  
        return sub;
      }
      catch (Exception ex) {
        log.error(ex);
        return null;
      }
    }
  
    /**
     *
     * @return Set
     */
    public Set getUserTokens()
    {
      Context session = Contexts.getSessionContext();
      if (session.get(CONTEXT_USER_TOKENS) == null)
      {
        synchronized(session)
        {
          if (session.get(CONTEXT_USER_TOKENS) == null)
            session.set(CONTEXT_USER_TOKENS, new HashSet<String> ());
        }
      }
      return (Set) session.get(CONTEXT_USER_TOKENS);
    }
  
    public RemoteSubscriber getSubscription(String token)
    {
      if (!getUserTokens().contains(token))
        throw new IllegalArgumentException(
          "Invalid token argument - token not found in Session Context.");
  
      return subscriptions.get(token);
    }
  }
  
  
  
  1.1      date: 2007/02/27 22:15:24;  author: sbryzak2;  state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/SubscriptionRequest.java
  
  Index: SubscriptionRequest.java
  ===================================================================
  package org.jboss.seam.remoting.messaging;
  
  import java.io.IOException;
  import java.io.OutputStream;
  
  /**
   *
   * @author Shane Bryzak
   */
  public class SubscriptionRequest
  {
    private String topicName;
    private RemoteSubscriber subscriber;
  
    public SubscriptionRequest(String topicName)
    {
      this.topicName = topicName;
    }
  
    public void subscribe()
    {
      subscriber = SubscriptionRegistry.instance().subscribe(topicName);
    }
  
    public void marshal(OutputStream out)
        throws IOException
    {
      out.write("<subscription topic=\"".getBytes());
      out.write(topicName.getBytes());
      out.write("\" token=\"".getBytes());
      out.write(subscriber.getToken().getBytes());
      out.write("\"/>".getBytes());
      out.flush();
    }
  }
  
  
  



More information about the jboss-cvs-commits mailing list