@@ -86,3 +86,93 @@ public class FastpassUiApplication {
86
86
}
87
87
}
88
88
` ` `
89
+
90
+
91
+ # Toll-intake
92
+
93
+ application.properties
94
+
95
+ ` ` ` properties
96
+ # first subscriber
97
+ spring.cloud.stream.bindings.readtollcharge-in-0.destination=tolltopic
98
+ spring.cloud.stream.bindings.readtollcharge-in-0.content-type=application/json
99
+
100
+ # second subscriber
101
+ spring.cloud.stream.bindings.processtollcharge-in-0.destination=tolltopic
102
+ spring.cloud.stream.bindings.processtollcharge-in-0.content-type=application/json
103
+
104
+ # Below fastpassprocessor needs to create manaully
105
+ spring.cloud.stream.bindings.processtollcharge-out-0.destination=fastpassprocessor
106
+ spring.cloud.stream.bindings.processtollcharge-out-0.content-type=application/json
107
+
108
+ spring.cloud.stream.function.routing.enabled=true
109
+ spring.cloud.function.routing-expression=headers[' speed' ] == ' fast' ? ' readTollChargeFast' : ' readTollChargeSlow'
110
+ spring.cloud.stream.bindings.functionRouter-in-0.destination=tolltopic
111
+ spring.cloud.stream.bindings.functionRouter-in-0.content-type=application/json
112
+
113
+
114
+ spring.rabbitmq.host=127.0.0.1
115
+ spring.rabbitmq.port=5672
116
+ spring.rabbitmq.username=guest
117
+ spring.rabbitmq.password=guest
118
+
119
+ ` ` `
120
+
121
+ TollIntakeApplication.java
122
+
123
+ ` ` ` java
124
+ package com.example.tollintake;
125
+
126
+ import java.time.Instant;
127
+ import java.util.Scanner;
128
+ import java.util.function.Consumer;
129
+ import java.util.function.Function;
130
+
131
+ import org.springframework.boot.CommandLineRunner;
132
+ import org.springframework.boot.SpringApplication;
133
+ import org.springframework.boot.autoconfigure.SpringBootApplication;
134
+ import org.springframework.context.annotation.Bean;
135
+
136
+ @SpringBootApplication
137
+ public class TollIntakeApplication implements CommandLineRunner {
138
+
139
+ public static void main(String[] args) {
140
+ SpringApplication.run(TollIntakeApplication.class, args);
141
+ }
142
+
143
+ @Bean
144
+ public Consumer< FastPassToll> readTollChargeFast () {
145
+ return value -> {
146
+ System.out.println(" received message for (fast) customer " + value.getFastPassId () + " at " + Instant.now().toString ());
147
+ };
148
+ }
149
+
150
+ @Bean
151
+ public Consumer< FastPassToll> readTollChargeSlow () {
152
+ return value -> {
153
+ System.out.println(" received message for (slow) customer " + value.getFastPassId () + " at " + Instant.now().toString ());
154
+ };
155
+ }
156
+
157
+ @Bean
158
+ public Function< FastPassToll, FastPassToll> processTollCharge () {
159
+ return value -> {
160
+ System.out.println(" Processing message" );
161
+ value.setStatus(" processed" );
162
+ return value;
163
+ };
164
+ }
165
+
166
+ @Override
167
+ public void run(String... args) throws Exception {
168
+ System.out.println(" listening ..." );
169
+
170
+ //wait for input
171
+ Scanner scanner = new Scanner(System.in);
172
+ String line = scanner.nextLine ();
173
+
174
+ }
175
+
176
+ }
177
+
178
+ ` ` `
0 commit comments