Software IT-Consulting und Coaching

Distribute CDI Events between different EARs

A typical JEE application consists of or is devided in different EARs or WARs to seperate concerns or to be encoupled during updates. Usually two EARs, even within the same VM, communicate via REST, JMS or (hopefully not :)) via remote EJB.

Since CDI steped into our enterprise world, developers started loving it. Nearly everything can be extended, produced and injected. This is great, but still not enough for big enterprise applications, distributed over several (component) EARs.

And here comes the requirement:

I still would spread out my CDI events also to EARs in the hood. There are several ways to do this manually. With JEE7 and integration of WebSockets a very simple and more or less generic approach is beeing obtruded.

Distribute CDI events via WebSockets

First we assume a core.ear and a component.ear. Events can be distributed in both ways (it’s the nature of websockets), but typically we have only one direction. In our example we’d like to send an event FeatureEnabled from core to component EAR.

First of all (if we don’t have just simple WARs) both EARs need to be bundled with a shared-api.jar, which holds the event domain objects. The core.ear need to provide an internal WAR core-web.war to create the jax-ws WebSocket for transporting the events and typically a core-ejb.jar, where a bean is publishing the CDI events. The component.ear just comes up with a simple component-ejb.jar, to catch up triggered events.

So let the code speak up:

shared-api.jar

We create a marker interface, e.g. called DistributedEvent. This is necessary, because CDI events must be typed. As u know, observing events of type java.lang.Object generically is impossible in CDI.

package de.luxig.distevent.domain;

public interface DistributedEvent {
}

Now, of course, we need some meaningful distributed CDI event, our FeatureEnabled event:

package de.luxig.distevent.domain;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public interface FeatureEnabled implements DistributedEvent {
    private String feature;

    @JsonCreator
    public FeatureEnabled(@JsonProperty("feature") String feature) {
        this.feature = feature;
    }

    public String getFeature() {
        return this.feature;
    }
}

Since the events are transported over the (local) wire via WebSockets, we need to serialize and deserialize them. I decided to use jackson-json for this quest.

core-ejb.jar

Let’s create a service to spread out the event. We assume, the service is a base implementation of our core.ear:

package de.luxig.distevent.service;

// import ...

@Stateless
public class FeatureServiceBean implements FeatureService {
  @Inject
  private Event<FeatureEvent> event;
  public void enableFeature(String name) {
    event.fire(new FeatureEvent(name));
  }

}

That’s all for EJB part.
The FeatureService interface shall not be exposed here. It’s just glue code to inject the EJB afterwards to test the event distribution. Usually the FeatureService will be bundled within another core-api.jar, which is a lib of core.ear. This should be clear.

core-web.war

So far so good. Now we reach the interesting part. The core-web.war have to provide the WebSocket server endpoint, where clients can rely on. Hence we create a proper javax.websocket.server.ServerEndpoint.

package de.luxig.distevent.web;

import javax.websocket.server.ServerEndpoint;
// import ...;

@ServerEndpoint(value="/events", encoders=JacksonEncoder.class)
@javax.ejb.Singleton
public class DistributedEventServerEndpoint {
  private Set sessions = new LinkedHashSet<>();

  @OnOpen
  public void registerSession(Session session) {
    this.sessions.add(session);
  }

  @OnClose
  public void unregisterSession(Session session) {
    this.sessions.remove(session);
  }

  /**
   * Catch up any implementation of DistributedEvent and spread it out.
   */
  public void publish(@Observes @Any DistributedEvent event) {
    // distribute events asynchronously (sync is also possible)
    sessions.forEach(s -> s.getAsyncRemote().sendObject(event));
  }
}

The DistributedEventServerEndpoint is managed as singleton EJB to keep active client WebSocket sessions. We @Observes any type of DistributedEvent and push it over the websocket, which is exposed at ws://localhost:8080/events by the container. Any attached client will be able to receive it. The configured JacksonEncoder will serialize the CDI event into a transportable String:

package de.luxig.distevent.web;

// import ...
import javax.websocket.Encoder.Text;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JacksonEncoder implements Text {
  private static final ObjectMapper MAPPER = new ObjectMapper();

  @Override
  public void init(EndpointConfig config) {
  }

  @Override
  public void destroy() {
  }

  @Override
  public String encode(DistributedEvent event) throws EncodeException {
    try {
      StrBuilder sb = new StrBuilder(event.getClass().getName());
      sb.append('|').append(MAPPER.writeValueAsString(event));
      return sb.toString();
    } catch (JsonProcessingException e) {
      throw new EncodeException(event, "Could not be encoded to JSON", e);
    }
  }
}

If we listen on the WebSocket directly (e.g. with a proper Browser plugin), we get messages in form of:

de.luxig.distevent.domain.FeatureEnabled|{feature="Distribute event feature"}
de.luxig.distevent.domain.FeatureEnabled|{feature="Just another feature"}

Now we just create a REST service to be able to trigger our EJB service bean. This is just glue code, of course:

package de.luxig.distevent.web;

import javax.ws.rs.POST;
// import ...

@Path("/distribute")
public class DistributedEventService {

  @Inject
  private FeatureService service;

  @POST
  public void enable(String name) {
    service.enableFeature(name);
  }

}

Well, the core.ear is complete now. We can bundle and deploy it. On POSTing „New Feature“ to http://localhost:8080/distribute a FeatureEnabled event is fired to the websocket ws://localhost:8080/events.
Let’s have a look at the component.ear, which will listen on it.

component-ejb.jar

On component side, we need someone to listen on the WebSocket and firing the incoming event message back to the CDI container. Let’s continue with the trivial part, the simple DistributedEventListener.

package de.luxig.distevent.component;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;

import de.luxig.distevent.domain.event.DistributedEvent;

@ApplicationScoped
public class DistributedEventListener {

  @Inject
  private Logger logger;

  public void observeGenerale(@Observes DistributedEvent event) {
    logger.info("Got distributed event {}", event);
  }

  public void observeFeatureEvent(@Observes FeatureEvent event) {
    logger.info("New feature enabled {}", event.getFeature());
  }
}

This event listener is getting notified, when a DistributedEvent in general is coming in and also when a concrete FeatureEvent is recognized. Also on the component side, we need a connection to the Websocket obviously.

package de.luxig.distevent.component;
import javax.enterprise.inject.spi.CDI;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;

import de.luxig.distevent.domain.event.DistributedEvent;

@ClientEndpoint(decoders = JacksonDecoder.class)
public class DistributedEventClientEndpoint {
  private static final Logger LOG = LoggerFactory.getLogger(DistributedEventClientEndpoint.class);

  @OnMessage
  public void receive(DistributedEvent event) {
    LOG.info("Got event {} and pass through", event);
    CDI.current().getBeanManager().fireEvent(event);
  }
}

Here i noticed an issue at least within the GlassFish container: The ClientEndpoint instance is not managed by CDI. This is not nice, so in practice we have to step out fast of this ugly class :). On receiving the DistributedEvent websocket message, we fire this immediately back to CDI. Doing so, we are within the component.ear CDI world, which mades the event distributed over two EARs.

Gotcha!

Now some additional glue code. We need to activate the client endpoint, even if we „live“ a managed container. I guess the client endpoint was originally designed for standalone clients. In a common websocket usecase, the client endpoint is not needed, because the browser will open the client connection. Well, we have to deal with it:

package de.luxig.distevent.component;
// import ...

@Singleton
@Startup
public class ComponentActivator {

  @PostConstruct
  public void init() {
    String uri = "ws://localhost:8080/events";
    try {
      ContainerProvider.getWebSocketContainer().connectToServer(DistributedEventClientEndpoint.class, URI.create(uri));
    } catch (DeploymentException | IOException e) {
      logger.error("Unable to connect to WebSocket endpoint at " + uri, e);
    }
  }
}

The ComponentActivator runs on startup/deployment time of component.ear and will connect to the websocket at ws://localhost:8080/events.
We additionally need a decoder to decode our distributed event websocket messages, like this:

package de.luxig.distevent.component;
// import ...

import javax.websocket.Decoder.Text;

public class JacksonDecoder implements Text {
  private static final ObjectMapper MAPPER = new ObjectMapper();

  @Override
  public void init(EndpointConfig config) {
  }

  @Override
  public void destroy() {
  }

  @Override
  public DistributedEvent decode(String s) throws DecodeException {
    String className = StringUtils.substringBefore(s, "|");
    String json = StringUtils.substringAfter(s, "|");
    try {
      Class<?> type = Class.forName(className);
      if (!DistributedEvent.class.isAssignableFrom(type)) {
        throw new DecodeException(s, "Event type is not assignable from " + DistributedEvent.class.getName());
      }
      return (DistributedEvent) MAPPER.readValue(json, type);
    } catch (ClassNotFoundException e) {
      throw new DecodeException(s, "Unable to load class " + className, e);
    } catch (IOException e) {
      throw new DecodeException(s, "Unable to decode JSON " + json, e);
    }
  }

  @Override
  public boolean willDecode(String s) {
    return true;
  }

}

Now, the component.ear can be packed and deployed anywhere (also on remote servers) or as usual beside to core.ear.

Finally

That’s it! The CDI events are transported in a generic way to every EAR or WAR connecting to the websocket. All u have to do is firing the events via CDI on sender site and @Observe them on consumer site. Like we use it all day!

If both sites know the jackson encoder and decoder, also the component.ear can distribute events back to core.ear. It’s the nature of websockets and just straight forward: One have to implement @OnMessage within the ServerEndpoint as well.