2
\$\begingroup\$

I'm trying to achieve a thread-safe reactive token service. The point is that all subscribers must wait until the token is received or updated (when expired). It works, but I want a second opinion about it's thread safety, as I'm not really sure how it should be done with reactive stack. The token lives 1 hour, but I update it 5000 ms before expiry.

@Service
@Slf4j
public class TokenService {
 // time before real token expires
 // token expiry time calculated like this:
 // time = expiry_time - UPDATE_TOKEN_BEFORE
 // thus the token will be updated a bit earlier than it expires
 private static final int UPDATE_TOKEN_BEFORE = 5000;
 private final WebClient webClient;
 private final TokenConfiguration config;
 @SuppressWarnings("FieldCanBeLocal")
 private volatile String token;
 private volatile long expiresMillis;
 private volatile boolean hasToken = false;
 private volatile boolean loading = false;
 private final Sinks.Many<String> sink;
 public TokenService(WebClient webClient, TokenConfiguration config) {
 this.webClient = webClient;
 this.config = config;
 this.sink = Sinks.unsafe().many().replay().latest();
 }
 /**
 * Get the cached token or request a new one from the auth service.
 * Makes subscribers wait until the first one gets the token from the auth service, then supplies them with
 * already retrieved token.
 *
 * @return {@link Mono} of token string.
 */
 public Mono<String> getToken() {
 if (!hasToken || System.currentTimeMillis() >= expiresMillis) {
 if (!loading) {
 // flag that we're processing the token
 this.loading = true;
 return getTokenMono()
 // update token & expiresMillis locally
 .map(this::updateToken)
 .map(token -> {
 final Sinks.EmitResult result = sink.tryEmitNext(token);
 // throw EmissionException if emitting fails
 result.orThrow();
 return token;
 })
 // finally, flag we're done processing
 .doOnSuccess(i -> this.loading = false)
 // if something goes wrong, flag that we should try again
 .doOnError(i -> {
 this.hasToken = false;
 this.loading = false;
 });
 }
 }
 // emit when the token gets ready
 return sink.asFlux().filter(token -> !token.isEmpty()).next();
 }
 /**
 * Invalidate the token.
 */
 public void invalidateToken() {
 this.hasToken = false;
 }
 /**
 * Retrieve the token from the auth service.
 *
 * @return Mono of {@link TokenResponse}.
 */
 private Mono<TokenResponse> getTokenMono() {
 log.info("Retrieving the access token from upstream");
 return webClient.post()
 .uri(config.getTokenUrl())
 .body(BodyInserters.fromFormData("grant_type", "credentials")
 .with("client_id", config.getClientId())
 .with("client_secret", config.getClientSecret()))
 .retrieve()
 .bodyToMono(TokenResponse.class);
 }
 /**
 * Update the token in this class and return it.
 *
 * @param response see {@link TokenResponse}
 * @return the token from {@code response}.
 */
 private String updateToken(TokenResponse response) {
 // make the token expire a few seconds before it is set to expire
 log.info("New token: {}", response);
 this.expiresMillis = (response.getExpiresIn() * 1000) + System.currentTimeMillis() - UPDATE_TOKEN_BEFORE;
 this.token = "Bearer " + response.getAccessToken();
 this.hasToken = true;
 return this.token;
 }
}
Sᴀᴍ Onᴇᴌᴀ
29.6k16 gold badges45 silver badges203 bronze badges
asked Apr 12, 2023 at 8:36
\$\endgroup\$

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.