A typical JEE application consists of or is devided in different EARs or WARs to seperate concerns or to be encoupled during updates. Usually two EARs, even within the same VM, communicate via REST, JMS or (hopefully not :)) via remote EJB.
Since CDI steped into our enterprise world, developers started loving it. Nearly everything can be extended, produced and injected. This is great, but still not enough for big enterprise applications, distributed over several (component) EARs.
And here comes the requirement:
I still would spread out my CDI events also to EARs in the hood. There are several ways to do this manually. With JEE7 and integration of WebSockets a very simple and more or less generic approach is beeing obtruded.
Distribute CDI events via WebSockets
First we assume a core.ear
and a component.ear
. Events can be distributed in both ways (it’s the nature of websockets), but typically we have only one direction. In our example we’d like to send an event FeatureEnabled
from core to component EAR.
First of all (if we don’t have just simple WARs) both EARs need to be bundled with a shared-api.jar
, which holds the event domain objects. The core.ear
need to provide an internal WAR core-web.war
to create the jax-ws WebSocket for transporting the events and typically a core-ejb.jar
, where a bean is publishing the CDI events. The component.ear
just comes up with a simple component-ejb.jar,
to catch up triggered events.
So let the code speak up:
shared-api.jar
We create a marker interface, e.g. called DistributedEvent
. This is necessary, because CDI events must be typed. As u know, observing events of type java.lang.Object
generically is impossible in CDI.
package de.luxig.distevent.domain; public interface DistributedEvent { }
Now, of course, we need some meaningful distributed CDI event, our FeatureEnabled
event:
package de.luxig.distevent.domain; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; public interface FeatureEnabled implements DistributedEvent { private String feature; @JsonCreator public FeatureEnabled(@JsonProperty("feature") String feature) { this.feature = feature; } public String getFeature() { return this.feature; } }
Since the events are transported over the (local) wire via WebSockets, we need to serialize and deserialize them. I decided to use jackson-json for this quest.
core-ejb.jar
Let’s create a service to spread out the event. We assume, the service is a base implementation of our core.ear
:
package de.luxig.distevent.service; // import ... @Stateless public class FeatureServiceBean implements FeatureService { @Inject private Event<FeatureEvent> event; public void enableFeature(String name) { event.fire(new FeatureEvent(name)); } }
That’s all for EJB part.
The FeatureService interface shall not be exposed here. It’s just glue code to inject the EJB afterwards to test the event distribution. Usually the FeatureService will be bundled within another core-api.jar, which is a lib of core.ear. This should be clear.
core-web.war
So far so good. Now we reach the interesting part. The core-web.war have to provide the WebSocket server endpoint, where clients can rely on. Hence we create a proper javax.websocket.server.ServerEndpoint
.
package de.luxig.distevent.web; import javax.websocket.server.ServerEndpoint; // import ...; @ServerEndpoint(value="/events", encoders=JacksonEncoder.class) @javax.ejb.Singleton public class DistributedEventServerEndpoint { private Set sessions = new LinkedHashSet<>(); @OnOpen public void registerSession(Session session) { this.sessions.add(session); } @OnClose public void unregisterSession(Session session) { this.sessions.remove(session); } /** * Catch up any implementation of DistributedEvent and spread it out. */ public void publish(@Observes @Any DistributedEvent event) { // distribute events asynchronously (sync is also possible) sessions.forEach(s -> s.getAsyncRemote().sendObject(event)); } }
The DistributedEventServerEndpoint
is managed as singleton EJB to keep active client WebSocket sessions. We @Observes
any type of DistributedEvent
and push it over the websocket, which is exposed at ws://localhost:8080/events
by the container. Any attached client will be able to receive it. The configured JacksonEncoder
will serialize the CDI event into a transportable String:
package de.luxig.distevent.web; // import ... import javax.websocket.Encoder.Text; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; public class JacksonEncoder implements Text { private static final ObjectMapper MAPPER = new ObjectMapper(); @Override public void init(EndpointConfig config) { } @Override public void destroy() { } @Override public String encode(DistributedEvent event) throws EncodeException { try { StrBuilder sb = new StrBuilder(event.getClass().getName()); sb.append('|').append(MAPPER.writeValueAsString(event)); return sb.toString(); } catch (JsonProcessingException e) { throw new EncodeException(event, "Could not be encoded to JSON", e); } } }
If we listen on the WebSocket directly (e.g. with a proper Browser plugin), we get messages in form of:
de.luxig.distevent.domain.FeatureEnabled|{feature="Distribute event feature"} de.luxig.distevent.domain.FeatureEnabled|{feature="Just another feature"}
Now we just create a REST service to be able to trigger our EJB service bean. This is just glue code, of course:
package de.luxig.distevent.web; import javax.ws.rs.POST; // import ... @Path("/distribute") public class DistributedEventService { @Inject private FeatureService service; @POST public void enable(String name) { service.enableFeature(name); } }
Well, the core.ear
is complete now. We can bundle and deploy it. On POSTing „New Feature“ to http://localhost:8080/distribute
a FeatureEnabled event is fired to the websocket ws://localhost:8080/events
.
Let’s have a look at the component.ear,
which will listen on it.
component-ejb.jar
On component side, we need someone to listen on the WebSocket and firing the incoming event message back to the CDI container. Let’s continue with the trivial part, the simple DistributedEventListener
.
package de.luxig.distevent.component; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.event.Observes; import javax.inject.Inject; import de.luxig.distevent.domain.event.DistributedEvent; @ApplicationScoped public class DistributedEventListener { @Inject private Logger logger; public void observeGenerale(@Observes DistributedEvent event) { logger.info("Got distributed event {}", event); } public void observeFeatureEvent(@Observes FeatureEvent event) { logger.info("New feature enabled {}", event.getFeature()); } }
This event listener is getting notified, when a DistributedEvent
in general is coming in and also when a concrete FeatureEvent
is recognized. Also on the component side, we need a connection to the Websocket obviously.
package de.luxig.distevent.component; import javax.enterprise.inject.spi.CDI; import javax.websocket.ClientEndpoint; import javax.websocket.OnMessage; import de.luxig.distevent.domain.event.DistributedEvent; @ClientEndpoint(decoders = JacksonDecoder.class) public class DistributedEventClientEndpoint { private static final Logger LOG = LoggerFactory.getLogger(DistributedEventClientEndpoint.class); @OnMessage public void receive(DistributedEvent event) { LOG.info("Got event {} and pass through", event); CDI.current().getBeanManager().fireEvent(event); } }
Here i noticed an issue at least within the GlassFish container: The ClientEndpoint
instance is not managed by CDI. This is not nice, so in practice we have to step out fast of this ugly class :). On receiving the DistributedEvent
websocket message, we fire this immediately back to CDI. Doing so, we are within the component.ear
CDI world, which mades the event distributed over two EARs.
Gotcha!
Now some additional glue code. We need to activate the client endpoint, even if we „live“ a managed container. I guess the client endpoint was originally designed for standalone clients. In a common websocket usecase, the client endpoint is not needed, because the browser will open the client connection. Well, we have to deal with it:
package de.luxig.distevent.component; // import ... @Singleton @Startup public class ComponentActivator { @PostConstruct public void init() { String uri = "ws://localhost:8080/events"; try { ContainerProvider.getWebSocketContainer().connectToServer(DistributedEventClientEndpoint.class, URI.create(uri)); } catch (DeploymentException | IOException e) { logger.error("Unable to connect to WebSocket endpoint at " + uri, e); } } }
The ComponentActivator
runs on startup/deployment time of component.ear
and will connect to the websocket at ws://localhost:8080/events
.
We additionally need a decoder to decode our distributed event websocket messages, like this:
package de.luxig.distevent.component; // import ... import javax.websocket.Decoder.Text; public class JacksonDecoder implements Text { private static final ObjectMapper MAPPER = new ObjectMapper(); @Override public void init(EndpointConfig config) { } @Override public void destroy() { } @Override public DistributedEvent decode(String s) throws DecodeException { String className = StringUtils.substringBefore(s, "|"); String json = StringUtils.substringAfter(s, "|"); try { Class<?> type = Class.forName(className); if (!DistributedEvent.class.isAssignableFrom(type)) { throw new DecodeException(s, "Event type is not assignable from " + DistributedEvent.class.getName()); } return (DistributedEvent) MAPPER.readValue(json, type); } catch (ClassNotFoundException e) { throw new DecodeException(s, "Unable to load class " + className, e); } catch (IOException e) { throw new DecodeException(s, "Unable to decode JSON " + json, e); } } @Override public boolean willDecode(String s) { return true; } }
Now, the component.ear
can be packed and deployed anywhere (also on remote servers) or as usual beside to core.ear
.
Finally
That’s it! The CDI events are transported in a generic way to every EAR or WAR connecting to the websocket. All u have to do is firing the events via CDI on sender site and @Observe
them on consumer site. Like we use it all day!
If both sites know the jackson encoder and decoder, also the component.ear
can distribute events back to core.ear
. It’s the nature of websockets and just straight forward: One have to implement @OnMessage
within the ServerEndpoint as well.