|  | 
| 1 | 1 | package org.javaee8.jaxrs.sseproducer; | 
| 2 | 2 | 
 | 
|  | 3 | +import static org.hamcrest.CoreMatchers.instanceOf; | 
| 3 | 4 | import static org.jboss.shrinkwrap.api.ShrinkWrap.create; | 
|  | 5 | +import static org.junit.Assert.assertNotNull; | 
|  | 6 | +import static org.junit.Assert.assertThat; | 
| 4 | 7 | 
 | 
| 5 |  | -import java.io.IOException; | 
| 6 | 8 | import java.net.URL; | 
| 7 |  | -import java.util.Arrays; | 
| 8 | 9 | import java.util.Date; | 
|  | 10 | +import java.util.Queue; | 
|  | 11 | +import java.util.concurrent.ConcurrentLinkedQueue; | 
|  | 12 | +import java.util.function.Consumer; | 
|  | 13 | + | 
| 9 | 14 | import javax.json.bind.Jsonb; | 
| 10 | 15 | import javax.json.bind.JsonbBuilder; | 
|  | 16 | +import javax.ws.rs.client.Client; | 
|  | 17 | +import javax.ws.rs.client.ClientBuilder; | 
|  | 18 | +import javax.ws.rs.client.WebTarget; | 
|  | 19 | +import javax.ws.rs.core.Response; | 
|  | 20 | +import javax.ws.rs.sse.InboundSseEvent; | 
|  | 21 | +import javax.ws.rs.sse.SseEventSource; | 
| 11 | 22 | 
 | 
|  | 23 | +import org.glassfish.jersey.client.ClientConfig; | 
|  | 24 | +import org.glassfish.jersey.client.ClientProperties; | 
|  | 25 | +import org.glassfish.jersey.client.HttpUrlConnectorProvider; | 
|  | 26 | +import org.hamcrest.Matchers; | 
| 12 | 27 | import org.javaee8.jaxrs.sseproducer.data.EventData; | 
| 13 | 28 | import org.javaee8.jaxrs.sseproducer.producer.SseResource; | 
| 14 | 29 | import org.javaee8.jaxrs.sseproducer.rest.RestApplication; | 
|  | 
| 22 | 37 | import org.junit.Test; | 
| 23 | 38 | import org.junit.runner.RunWith; | 
| 24 | 39 | 
 | 
| 25 |  | -import javax.ws.rs.client.Client; | 
| 26 |  | -import javax.ws.rs.client.ClientBuilder; | 
| 27 |  | -import javax.ws.rs.client.WebTarget; | 
| 28 |  | -import javax.ws.rs.sse.SseEventSource; | 
| 29 |  | -import static org.hamcrest.CoreMatchers.instanceOf; | 
| 30 |  | -import static org.junit.Assert.assertNotNull; | 
| 31 |  | -import static org.junit.Assert.assertThat; | 
| 32 |  | -import static org.junit.Assert.assertTrue; | 
| 33 |  | - | 
| 34 | 40 | /** | 
|  | 41 | + * Test example for the Server-Sent Events with the Jersey JAX-RS implementation. | 
|  | 42 | + * | 
| 35 | 43 |  * @author Daniel Contreras | 
|  | 44 | + * @author David Matějček | 
| 36 | 45 |  */ | 
| 37 | 46 | @RunWith(Arquillian.class) | 
| 38 | 47 | public class SseResourceTest { | 
| 39 | 48 | 
 | 
|  | 49 | + private static final String[] EVENT_TYPES = {"INIT", "EVENT", "FINISH"}; | 
|  | 50 | + | 
| 40 | 51 |  @ArquillianResource | 
| 41 | 52 |  private URL base; | 
| 42 | 53 | 
 | 
| 43 | 54 |  private Client sseClient; | 
| 44 | 55 |  private WebTarget target; | 
|  | 56 | + private SseEventSource eventSource; | 
| 45 | 57 | 
 | 
| 46 |  | - SseEventSource eventSource; | 
| 47 |  | - | 
| 48 |  | - @Deployment(testable = false) | 
|  | 58 | + @Deployment(testable = true) | 
| 49 | 59 |  public static WebArchive createDeployment() { | 
| 50 |  | - return create(WebArchive.class) | 
| 51 |  | - .addClasses(RestApplication.class, SseResource.class, EventData.class, JsonbBuilder.class, Jsonb.class); | 
|  | 60 | + return create(WebArchive.class).addClasses(RestApplication.class, SseResource.class, EventData.class); | 
| 52 | 61 |  } | 
| 53 | 62 | 
 | 
|  | 63 | + | 
|  | 64 | + /** | 
|  | 65 | + * Initializes the client, target and the eventSource used to create event consumers | 
|  | 66 | + */ | 
| 54 | 67 |  @Before | 
| 55 | 68 |  public void setup() { | 
| 56 |  | - this.sseClient = ClientBuilder.newClient(); | 
| 57 |  | - this.target = this.sseClient.target(base + "rest/sse/register"); | 
| 58 |  | - eventSource = SseEventSource.target(target).build(); | 
|  | 69 | + // this is needed to avoid a conflict with embedded server, that can have | 
|  | 70 | + // customized configuration and connector providers. | 
|  | 71 | + final ClientConfig configuration = new ClientConfig(); | 
|  | 72 | + configuration.property(ClientProperties.CONNECT_TIMEOUT, 100); | 
|  | 73 | + configuration.property(ClientProperties.READ_TIMEOUT, 5000); | 
|  | 74 | + configuration.connectorProvider(new HttpUrlConnectorProvider()); | 
|  | 75 | + this.sseClient = ClientBuilder.newClient(configuration); | 
|  | 76 | + this.target = this.sseClient.target(this.base + "rest/sse/register"); | 
|  | 77 | + this.eventSource = SseEventSource.target(this.target).build(); | 
| 59 | 78 |  System.out.println("SSE Event source created........"); | 
|  | 79 | + final Response response = this.target.request().get(); | 
|  | 80 | + assertThat("GET response status - server is not ready", response.getStatus(), | 
|  | 81 | + Matchers.equalTo(Response.Status.OK.getStatusCode())); | 
| 60 | 82 |  } | 
| 61 | 83 | 
 | 
|  | 84 | + | 
|  | 85 | + /** | 
|  | 86 | + * Closes all client resources. | 
|  | 87 | + */ | 
| 62 | 88 |  @After | 
| 63 | 89 |  public void teardown() { | 
| 64 |  | - eventSource.close(); | 
|  | 90 | + this.eventSource.close(); | 
| 65 | 91 |  System.out.println("Closed SSE Event source.."); | 
| 66 |  | - sseClient.close(); | 
|  | 92 | + this.sseClient.close(); | 
| 67 | 93 |  System.out.println("Closed JAX-RS client.."); | 
| 68 | 94 |  } | 
| 69 | 95 | 
 | 
| 70 |  | - String[] types = {"INIT", "EVENT", "FINISH"}; | 
| 71 | 96 | 
 | 
| 72 |  | - @Test | 
|  | 97 | + /** | 
|  | 98 | + * Registers reaction on events, waits for events and checks their content. | 
|  | 99 | + * | 
|  | 100 | + * @throws Exception | 
|  | 101 | + */ | 
|  | 102 | + @Test(timeout = 5000) | 
| 73 | 103 |  @RunAsClient | 
| 74 |  | - public void testSSE() throws IOException { | 
| 75 |  | - | 
| 76 |  | - Jsonbjsonb = JsonbBuilder.create(); | 
| 77 |  | - | 
| 78 |  | - System.out.println("SSE Client triggered in thread " + Thread.currentThread().getName()); | 
| 79 |  | - try { | 
| 80 |  | - eventSource.register( | 
| 81 |  | -  (sseEvent) | 
| 82 |  | -  -> { | 
| 83 |  | - assertTrue(Arrays.asList(types).contains(sseEvent.getName())); | 
| 84 |  | - assertNotNull(sseEvent.readData()); | 
| 85 |  | - EventDataevent = jsonb.fromJson(sseEvent.readData(), EventData.class); | 
| 86 |  | - assertThat(event.getTime(), instanceOf(Date.class)); | 
| 87 |  | - assertNotNull(event.getId()); | 
| 88 |  | - assertTrue(event.getComment().contains("event:")); | 
| 89 |  | - System.out.println("\nSSE Event received :: " + event.toString() +"\n"); | 
| 90 |  | -  | 
| 91 |  | -  }, | 
| 92 |  | -  (e) -> e.printStackTrace()); | 
| 93 |  | - | 
| 94 |  | - eventSource.open(); | 
| 95 |  | - Thread.sleep(1500); | 
| 96 |  | - } catch (Exceptione) { | 
| 97 |  | - System.out.println("Error on SSE Test"); | 
| 98 |  | - System.out.println(e.getMessage()); | 
|  | 104 | + public void testSSE() throws Exception { | 
|  | 105 | +finalQueue<Throwable> asyncExceptions = newConcurrentLinkedQueue<>(); | 
|  | 106 | + finalQueue<EventData> receivedEvents = newConcurrentLinkedQueue<>(); | 
|  | 107 | +// jsonb is thread safe! | 
|  | 108 | + finalJsonbjsonb = JsonbBuilder.create(); | 
|  | 109 | + finalConsumer<InboundSseEvent> onEvent = (sseEvent) -> { | 
|  | 110 | + assertThat("event type", sseEvent.getName(), Matchers.isOneOf(EVENT_TYPES)); | 
|  | 111 | + finalStringdata = sseEvent.readData(); | 
|  | 112 | + System.out.println("Data received as string:\n" + data); | 
|  | 113 | + assertNotNull("data received as string", data); | 
|  | 114 | + finalEventDataevent = jsonb.fromJson(data, EventData.class); | 
|  | 115 | + receivedEvents.add(event); | 
|  | 116 | + assertThat("event.time", event.getTime(), instanceOf(Date.class)); | 
|  | 117 | + assertNotNull("event.id", event.getId()); | 
|  | 118 | + assertThat("event.comment", event.getComment(), Matchers.containsString("event:")); | 
|  | 119 | + }; | 
|  | 120 | + this.eventSource.register(onEvent, asyncExceptions::add); | 
|  | 121 | + System.out.println("Server Side Events Client registered in the test thread."); | 
|  | 122 | + // following line starts acceptation of events. | 
|  | 123 | +this.eventSource.open(); | 
|  | 124 | + // don't end the test until we have all events or timeout or error comes. | 
|  | 125 | + // this is not an obvious implementation, we only need to hold the test until all events | 
|  | 126 | + // are asynchronously processed. | 
|  | 127 | + while (receivedEvents.size() <= 5 && asyncExceptions.isEmpty()) { | 
|  | 128 | + Thread.sleep(10L); | 
| 99 | 129 |  } | 
| 100 |  | - | 
|  | 130 | +assertThat("receiver exceptions", asyncExceptions, Matchers.emptyIterable()); | 
| 101 | 131 |  } | 
| 102 |  | - | 
| 103 | 132 | } | 
0 commit comments