6

I am new to Reactive Programming. i need to connect to Redis to save and get some data. The redis instance is present in cloud. Am using Lettuce Connection factory to establish the connection.

when establishing the connection to redis, the request fails. Here is my Redis configuration class :

package com.sap.slh.tax.attributes.determination.springwebfluxdemo.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.EnableAsync;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.util.JsonUtil;
@Configuration
@EnableAsync
public class RedisConfig {
 private static final Logger log = LoggerFactory.getLogger(RedisConfig.class);
 @Value("${vcap.services.redis.credentials.hostname:10.11.241.101}")
 private String host;
 @Value("${vcap.services.redis.credentials.port:36516}")
 private int port;
 @Value("$vcap.services.redis.credentials.password:123456788")
 private String password;
 @Bean
 public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
 RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host, port);
 redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
 redisStandaloneConfiguration.setDatabase(0);
 log.error("Redis standalone configuration{}",JsonUtil.toJsonString(redisStandaloneConfiguration));
 LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder().build();
 LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfig);
 lettuceConnectionFactory.afterPropertiesSet();
 return lettuceConnectionFactory;
 }
 @Bean
 ReactiveRedisOperations<TaxDetails, TaxLine> redisOperations(
 ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
 Jackson2JsonRedisSerializer<TaxDetails> serializer = new Jackson2JsonRedisSerializer<>(TaxDetails.class);
 Jackson2JsonRedisSerializer<TaxLine> serializer1 = new Jackson2JsonRedisSerializer<>(TaxLine.class);
 RedisSerializationContext.RedisSerializationContextBuilder<TaxDetails, TaxLine> builder = RedisSerializationContext
 .newSerializationContext(new StringRedisSerializer());
 RedisSerializationContext<TaxDetails, TaxLine> context = builder.key(serializer).value(serializer1).build();
 ;
 return new ReactiveRedisTemplate<>(
 reactiveRedisConnectionFactory, context);
 }
}

and here is my look up service class which actually communicates with redis during the request


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.stereotype.Service;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.RedisRepo;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.util.JsonUtil;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
public class RedisTaxLineLookUpService {
 private static final Logger log = LoggerFactory.getLogger(RedisTaxLineLookUpService.class);
 @Autowired
 private ReactiveRedisOperations<TaxDetails, TaxLine> redisOperations;
 public Flux<TaxLine> get(TaxDetails taxDetails) {
 log.info("going to call redis to fetch tax lines{}", JsonUtil.toJsonString(taxDetails));
 return redisOperations.keys(taxDetails).flatMap(redisOperations.opsForValue()::get);
 }
 public Mono<RedisRepo> set(RedisRepo redisRepo) {
 log.info("going to call redis to save tax lines{}", JsonUtil.toJsonString(redisRepo.getTaxDetails()));
 return redisOperations.opsForValue().set(redisRepo.getTaxDetails(), redisRepo.getTaxLine())
 .map(__ -> redisRepo);
 }
}

Stack trace :

2020年03月26日T16:27:54.513+0000 [APP/PROC/WEB/0] OUT org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to 10.11.241.101:36516 | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1199) | Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: | Error has been observed at the following site(s): | |_ checkpoint ? Handler com.sap.slh.tax.attributes.determination.springwebfluxdemo.controller.TaxLinesDeterminationController#saveTaxLines(RedisRepo) [DispatcherHandler] | |_ checkpoint ? HTTP POST "/tax/lines/save/" [ExceptionHandlingWebHandler] | Stack trace: | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1199) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getConnection(LettuceConnectionFactory.java:1178) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getSharedReactiveConnection(LettuceConnectionFactory.java:952) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getReactiveConnection(LettuceConnectionFactory.java:429) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getReactiveConnection(LettuceConnectionFactory.java:94) | at org.springframework.data.redis.core.ReactiveRedisTemplate.lambda$doInConnection0ドル(ReactiveRedisTemplate.java:198) | at reactor.core.publisher.MonoSupplier.call(MonoSupplier.java:85) | at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:80) | at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) | at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) | at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) | at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:296) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247) | at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329) | at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173) | at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) | at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) | at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) | at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103) | at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) | at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:330) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:160) | at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) | at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) | at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) | at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:419) | at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:209) | at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:367) | at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:363) | at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:489) | at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)

Any suggestions or answers would be highly helpful ! Thanks in Advance !

asked Mar 26, 2020 at 17:05

5 Answers 5

6

I use this RedisConfig.java and it works for me.

@Configuration
@ConfigurationProperties(prefix = "spring.redis")
@Setter
public class RedisConfig {
 
 private String host;
 private String password;
 
 @Bean
 @Primary
 public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory(RedisConfiguration defaultRedisConfig) {
 LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
 .useSsl().build();
 return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);
 }
 
 @Bean
 public RedisConfiguration defaultRedisConfig() {
 RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
 config.setHostName(host);
 config.setPassword(RedisPassword.of(password));
 return config;
 }
}
svarog
9,8775 gold badges66 silver badges79 bronze badges
answered Dec 16, 2020 at 2:22
Sign up to request clarification or add additional context in comments.

1 Comment

Saved a lot of time (Y) Heads Up...!!! (Y)
4

I had similar problem with Redis running on AWS (EC2 instance). It works after:

sudo vi /etc/redis/redis.conf

  1. Comment line: bind 127.0.0.1 ::1
  2. Set the line protected-mode no
  3. Set the line supervised systemd
  4. sudo systemctl restart redis.service
  5. Check the AWS security groups just in case.
answered Nov 1, 2021 at 9:20

Comments

2

I updated my RedisConfig class as follows :

package com.sap.slh.tax.attributes.determination.springwebfluxdemo.config;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConfiguration;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
import io.lettuce.core.RedisURI;
import io.pivotal.cfenv.core.CfEnv;
@Configuration
public class RedisConfig {
 
 CfEnv cfEnv = new CfEnv();
 String tag = "redis";
 String redisHost = cfEnv.findCredentialsByTag(tag).getHost();
 @Bean
 @Primary
 public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory(RedisConfiguration defaultRedisConfig) {
 LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
 .commandTimeout(Duration.ofMillis(60000)).build();
 return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);
 }
 @Bean
 public RedisConfiguration defaultRedisConfig() {
 if (redisHost != null) {
// RedisStandaloneConfiguration config = new RedisStandaloneConfiguration("127.0.0.1", 6379);
 RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
 String redisPort = cfEnv.findCredentialsByTag(tag).getPort();
 String redisPassword = cfEnv.findCredentialsByTag(tag).getPassword();
 config.setHostName(redisHost);
 config.setPassword(RedisPassword.of(redisPassword));
 config.setPort(Integer.parseInt(redisPort));
 config.setDatabase(2);
 return config;
 } else {
 RedisSentinelConfiguration config = new RedisSentinelConfiguration();
 String uri = cfEnv.findCredentialsByTag(tag).getUri();
 RedisURI redisURI = RedisURI.create(uri);
 config.master(redisURI.getSentinelMasterId());
 List<RedisNode> nodes = redisURI.getSentinels().stream()
 .map(redisUri -> populateNode(redisUri.getHost(), redisUri.getPort())).collect(Collectors.toList());
 nodes.forEach(node -> config.addSentinel(node));
 config.setPassword(RedisPassword.of(redisURI.getPassword()));
 config.setDatabase(2);
 return config;
 }
 }
 @Bean
 public ReactiveRedisOperations<TaxDetails, TaxLine> reactiveRedisTemplate(
 ReactiveRedisConnectionFactory factory) {
 StringRedisSerializer keySerializer = new StringRedisSerializer();
 Jackson2JsonRedisSerializer<TaxLine> valueSerializer = new Jackson2JsonRedisSerializer<>(
 TaxLine.class);
 Jackson2JsonRedisSerializer<TaxDetails> valueSerializer1 = new Jackson2JsonRedisSerializer<>(
 TaxDetails.class);
 RedisSerializationContext.RedisSerializationContextBuilder<TaxDetails, TaxLine> builder = RedisSerializationContext
 .newSerializationContext(keySerializer);
 RedisSerializationContext<TaxDetails, TaxLine> context = builder.key(valueSerializer1).value(valueSerializer).build();
 return new ReactiveRedisTemplate<>(factory, context);
 }
 
 private RedisNode populateNode(String host, Integer port) {
 return new RedisNode(host, port);
 }
}

Dependencies for cfEnv:

<dependency>
 <groupId>io.pivotal.cfenv</groupId>
 <artifactId>java-cfenv-boot</artifactId>
 <version>2.1.1.RELEASE</version>
</dependency>
svarog
9,8775 gold badges66 silver badges79 bronze badges
answered Mar 30, 2020 at 18:13

Comments

1

After settings useSsl to true my problem solved

@Bean("lettuceConnectionFactory")
@Profile("prod")
LettuceConnectionFactory productionLettuceConnectionFactory() {
 LettuceConnectionFactory factory = new LettuceConnectionFactory();
 factory.setHostName(redisHostName);
 factory.setPort(Integer.parseInt(port));
 factory.setPassword(password);
 factory.setUseSsl(true);
 return factory;
}
answered Apr 10, 2024 at 15:55

Comments

0

Make Sure to use jedisConFactory.setUseSsl(true); for aws redis connection. By Default ssl is off for redis connection.

@Bean
JedisConnectionFactory jedisConnectionFactory() {
 final JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
 jedisPoolConfig.setMaxTotal(300);
 jedisPoolConfig.setMaxIdle(20);
 jedisPoolConfig.setMaxWaitMillis(2000);
 jedisPoolConfig.setBlockWhenExhausted(true);
 
 JedisConnectionFactory jedisConFactory = new JedisConnectionFactory(jedisPoolConfig);
 
 jedisConFactory.setHostName(redisHost);
 jedisConFactory.setPort(redisPort);
 jedisConFactory.setPassword(redisPassword);
 jedisConFactory.setUsePool(true);
 jedisConFactory.setTimeout(redisTimeout);
 jedisConFactory.setUseSsl(redisSsl);
 
 return jedisConFactory;
}
@Bean(name = "redisTemplate")
public RedisTemplate<String, Object> redisTemplate() {
 RedisTemplate<String, Object> template = new RedisTemplate<>();
 template.setConnectionFactory(jedisConnectionFactory());
 //template.setEnableTransactionSupport(true);
 template.setExposeConnection(true);
 template.afterPropertiesSet();
 return template;
}
answered Jul 12, 2023 at 6:34

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.