Listening to many Kafka Streams in Spring
I'm developing an application in the event-driven architecture.
I'm trying to model the following flow of events:
UserAccountCreated (user-management-events) -> sending an e-mail -> MailNotificationSent (notification-service-events)
The notification-service application executes the whole flow. It waits for the UserAccountCreated event by listening to user-management-events topic. When the event is received, the application sends the email and publishes a new event - MailNotificationSent to the notification-service-events topic.
I have no problems with listening to the first event (UserAccountCreated) - application receives it and performs the rest of the flow. I also have no problem with publishing the MailNotificationSent event. Unfortunately, for development purposes, I want to listen to the MailNotificationSent event in the notification service, so the application has to listen to both UserAccountCreated and MailNotificationSent. Here I'm not able to make it works.
Let's take a look at the implementation:
NotificationStreams:
public interface NotificationStreams {
String INPUT = "notification-service-events-in";
String OUTPUT = "notification-service-events-out";
@Input(INPUT)
SubscribableChannel inboundEvents();
@Output(OUTPUT)
MessageChannel outboundEvents();
}
NotificationsEventsListener:
@Slf4j
@Component
@RequiredArgsConstructor
public class NotificationEventsListener {
@StreamListener(NotificationStreams.INPUT)
public void notificationServiceEventsIn(Flux<ActivationLinkSent> input) {
input.subscribe(event -> {
log.info("Received event ActivationLinkSent: " + event.toString());
});
}
}
UserManagementEvents:
public interface UserManagementEvents {
String INPUT = "user-management-events";
@Input(INPUT)
SubscribableChannel inboundEvents();
}
UserManagementEventsListener:
@Slf4j
@Component
@RequiredArgsConstructor
public class UserManagementEventsListener {
private final Gate gate;
@StreamListener(UserManagementEvents.INPUT)
public void userManagementEvents(Flux<UserAccountCreated> input) {
input.subscribe(event -> {
log.info("Received event UserAccountCreated: " + event.toString());
gate.dispatch(SendActivationLink.builder()
.email(event.getEmail())
.username(event.getUsername())
.build()
);
});
}
}
KafkaStreamsConfig:
@EnableBinding(value = {NotificationStreams.class, UserManagementEvents.class})
public class KafkaStreamsConfig {
}
EventPublisher:
@Slf4j
@RequiredArgsConstructor
@Component
public class EventPublisher {
private final NotificationStreams eventsStreams;
private final AvroMessageBuilder messageBuilder;
public void publish(Event event) {
MessageChannel messageChannel = eventsStreams.outboundEvents();
AvroActivationLinkSent activationLinkSent = new AvroActivationLinkSent(); activationLinkSent.setEmail(((ActivationLinkSent)event).getEmail());
activationLinkSent.setUsername(((ActivationLinkSent)event).getUsername() + "-domain");
activationLinkSent.setTimestamp(System.currentTimeMillis());
messageChannel.send(messageBuilder.buildMessage(activationLinkSent));
}
}
application config:
spring:
devtools:
restart:
enabled: true
cloud:
stream:
default:
contentType: application/*+avro
kafka:
binder:
brokers: localhost:9092
schemaRegistryClient:
endpoint: http://localhost:8990
kafka:
consumer:
group-id: notification-group
auto-offset-reset: earliest
kafka:
bootstrap:
servers: localhost:9092
The application seems to ignore the notification-service-events listener. It works when listening to only one stream.
I'm almost 100% sure that this is not an issue with publishing the event, because I've connected manually to Kafka and verified that messages are published properly:
kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic notification-service-events-out --from-beginning
Do you have any ideas what else I should check? Is there any additional configuration on the Spring side?
java spring apache-kafka spring-kafka
add a comment |
I'm developing an application in the event-driven architecture.
I'm trying to model the following flow of events:
UserAccountCreated (user-management-events) -> sending an e-mail -> MailNotificationSent (notification-service-events)
The notification-service application executes the whole flow. It waits for the UserAccountCreated event by listening to user-management-events topic. When the event is received, the application sends the email and publishes a new event - MailNotificationSent to the notification-service-events topic.
I have no problems with listening to the first event (UserAccountCreated) - application receives it and performs the rest of the flow. I also have no problem with publishing the MailNotificationSent event. Unfortunately, for development purposes, I want to listen to the MailNotificationSent event in the notification service, so the application has to listen to both UserAccountCreated and MailNotificationSent. Here I'm not able to make it works.
Let's take a look at the implementation:
NotificationStreams:
public interface NotificationStreams {
String INPUT = "notification-service-events-in";
String OUTPUT = "notification-service-events-out";
@Input(INPUT)
SubscribableChannel inboundEvents();
@Output(OUTPUT)
MessageChannel outboundEvents();
}
NotificationsEventsListener:
@Slf4j
@Component
@RequiredArgsConstructor
public class NotificationEventsListener {
@StreamListener(NotificationStreams.INPUT)
public void notificationServiceEventsIn(Flux<ActivationLinkSent> input) {
input.subscribe(event -> {
log.info("Received event ActivationLinkSent: " + event.toString());
});
}
}
UserManagementEvents:
public interface UserManagementEvents {
String INPUT = "user-management-events";
@Input(INPUT)
SubscribableChannel inboundEvents();
}
UserManagementEventsListener:
@Slf4j
@Component
@RequiredArgsConstructor
public class UserManagementEventsListener {
private final Gate gate;
@StreamListener(UserManagementEvents.INPUT)
public void userManagementEvents(Flux<UserAccountCreated> input) {
input.subscribe(event -> {
log.info("Received event UserAccountCreated: " + event.toString());
gate.dispatch(SendActivationLink.builder()
.email(event.getEmail())
.username(event.getUsername())
.build()
);
});
}
}
KafkaStreamsConfig:
@EnableBinding(value = {NotificationStreams.class, UserManagementEvents.class})
public class KafkaStreamsConfig {
}
EventPublisher:
@Slf4j
@RequiredArgsConstructor
@Component
public class EventPublisher {
private final NotificationStreams eventsStreams;
private final AvroMessageBuilder messageBuilder;
public void publish(Event event) {
MessageChannel messageChannel = eventsStreams.outboundEvents();
AvroActivationLinkSent activationLinkSent = new AvroActivationLinkSent(); activationLinkSent.setEmail(((ActivationLinkSent)event).getEmail());
activationLinkSent.setUsername(((ActivationLinkSent)event).getUsername() + "-domain");
activationLinkSent.setTimestamp(System.currentTimeMillis());
messageChannel.send(messageBuilder.buildMessage(activationLinkSent));
}
}
application config:
spring:
devtools:
restart:
enabled: true
cloud:
stream:
default:
contentType: application/*+avro
kafka:
binder:
brokers: localhost:9092
schemaRegistryClient:
endpoint: http://localhost:8990
kafka:
consumer:
group-id: notification-group
auto-offset-reset: earliest
kafka:
bootstrap:
servers: localhost:9092
The application seems to ignore the notification-service-events listener. It works when listening to only one stream.
I'm almost 100% sure that this is not an issue with publishing the event, because I've connected manually to Kafka and verified that messages are published properly:
kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic notification-service-events-out --from-beginning
Do you have any ideas what else I should check? Is there any additional configuration on the Spring side?
java spring apache-kafka spring-kafka
add a comment |
I'm developing an application in the event-driven architecture.
I'm trying to model the following flow of events:
UserAccountCreated (user-management-events) -> sending an e-mail -> MailNotificationSent (notification-service-events)
The notification-service application executes the whole flow. It waits for the UserAccountCreated event by listening to user-management-events topic. When the event is received, the application sends the email and publishes a new event - MailNotificationSent to the notification-service-events topic.
I have no problems with listening to the first event (UserAccountCreated) - application receives it and performs the rest of the flow. I also have no problem with publishing the MailNotificationSent event. Unfortunately, for development purposes, I want to listen to the MailNotificationSent event in the notification service, so the application has to listen to both UserAccountCreated and MailNotificationSent. Here I'm not able to make it works.
Let's take a look at the implementation:
NotificationStreams:
public interface NotificationStreams {
String INPUT = "notification-service-events-in";
String OUTPUT = "notification-service-events-out";
@Input(INPUT)
SubscribableChannel inboundEvents();
@Output(OUTPUT)
MessageChannel outboundEvents();
}
NotificationsEventsListener:
@Slf4j
@Component
@RequiredArgsConstructor
public class NotificationEventsListener {
@StreamListener(NotificationStreams.INPUT)
public void notificationServiceEventsIn(Flux<ActivationLinkSent> input) {
input.subscribe(event -> {
log.info("Received event ActivationLinkSent: " + event.toString());
});
}
}
UserManagementEvents:
public interface UserManagementEvents {
String INPUT = "user-management-events";
@Input(INPUT)
SubscribableChannel inboundEvents();
}
UserManagementEventsListener:
@Slf4j
@Component
@RequiredArgsConstructor
public class UserManagementEventsListener {
private final Gate gate;
@StreamListener(UserManagementEvents.INPUT)
public void userManagementEvents(Flux<UserAccountCreated> input) {
input.subscribe(event -> {
log.info("Received event UserAccountCreated: " + event.toString());
gate.dispatch(SendActivationLink.builder()
.email(event.getEmail())
.username(event.getUsername())
.build()
);
});
}
}
KafkaStreamsConfig:
@EnableBinding(value = {NotificationStreams.class, UserManagementEvents.class})
public class KafkaStreamsConfig {
}
EventPublisher:
@Slf4j
@RequiredArgsConstructor
@Component
public class EventPublisher {
private final NotificationStreams eventsStreams;
private final AvroMessageBuilder messageBuilder;
public void publish(Event event) {
MessageChannel messageChannel = eventsStreams.outboundEvents();
AvroActivationLinkSent activationLinkSent = new AvroActivationLinkSent(); activationLinkSent.setEmail(((ActivationLinkSent)event).getEmail());
activationLinkSent.setUsername(((ActivationLinkSent)event).getUsername() + "-domain");
activationLinkSent.setTimestamp(System.currentTimeMillis());
messageChannel.send(messageBuilder.buildMessage(activationLinkSent));
}
}
application config:
spring:
devtools:
restart:
enabled: true
cloud:
stream:
default:
contentType: application/*+avro
kafka:
binder:
brokers: localhost:9092
schemaRegistryClient:
endpoint: http://localhost:8990
kafka:
consumer:
group-id: notification-group
auto-offset-reset: earliest
kafka:
bootstrap:
servers: localhost:9092
The application seems to ignore the notification-service-events listener. It works when listening to only one stream.
I'm almost 100% sure that this is not an issue with publishing the event, because I've connected manually to Kafka and verified that messages are published properly:
kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic notification-service-events-out --from-beginning
Do you have any ideas what else I should check? Is there any additional configuration on the Spring side?
java spring apache-kafka spring-kafka
I'm developing an application in the event-driven architecture.
I'm trying to model the following flow of events:
UserAccountCreated (user-management-events) -> sending an e-mail -> MailNotificationSent (notification-service-events)
The notification-service application executes the whole flow. It waits for the UserAccountCreated event by listening to user-management-events topic. When the event is received, the application sends the email and publishes a new event - MailNotificationSent to the notification-service-events topic.
I have no problems with listening to the first event (UserAccountCreated) - application receives it and performs the rest of the flow. I also have no problem with publishing the MailNotificationSent event. Unfortunately, for development purposes, I want to listen to the MailNotificationSent event in the notification service, so the application has to listen to both UserAccountCreated and MailNotificationSent. Here I'm not able to make it works.
Let's take a look at the implementation:
NotificationStreams:
public interface NotificationStreams {
String INPUT = "notification-service-events-in";
String OUTPUT = "notification-service-events-out";
@Input(INPUT)
SubscribableChannel inboundEvents();
@Output(OUTPUT)
MessageChannel outboundEvents();
}
NotificationsEventsListener:
@Slf4j
@Component
@RequiredArgsConstructor
public class NotificationEventsListener {
@StreamListener(NotificationStreams.INPUT)
public void notificationServiceEventsIn(Flux<ActivationLinkSent> input) {
input.subscribe(event -> {
log.info("Received event ActivationLinkSent: " + event.toString());
});
}
}
UserManagementEvents:
public interface UserManagementEvents {
String INPUT = "user-management-events";
@Input(INPUT)
SubscribableChannel inboundEvents();
}
UserManagementEventsListener:
@Slf4j
@Component
@RequiredArgsConstructor
public class UserManagementEventsListener {
private final Gate gate;
@StreamListener(UserManagementEvents.INPUT)
public void userManagementEvents(Flux<UserAccountCreated> input) {
input.subscribe(event -> {
log.info("Received event UserAccountCreated: " + event.toString());
gate.dispatch(SendActivationLink.builder()
.email(event.getEmail())
.username(event.getUsername())
.build()
);
});
}
}
KafkaStreamsConfig:
@EnableBinding(value = {NotificationStreams.class, UserManagementEvents.class})
public class KafkaStreamsConfig {
}
EventPublisher:
@Slf4j
@RequiredArgsConstructor
@Component
public class EventPublisher {
private final NotificationStreams eventsStreams;
private final AvroMessageBuilder messageBuilder;
public void publish(Event event) {
MessageChannel messageChannel = eventsStreams.outboundEvents();
AvroActivationLinkSent activationLinkSent = new AvroActivationLinkSent(); activationLinkSent.setEmail(((ActivationLinkSent)event).getEmail());
activationLinkSent.setUsername(((ActivationLinkSent)event).getUsername() + "-domain");
activationLinkSent.setTimestamp(System.currentTimeMillis());
messageChannel.send(messageBuilder.buildMessage(activationLinkSent));
}
}
application config:
spring:
devtools:
restart:
enabled: true
cloud:
stream:
default:
contentType: application/*+avro
kafka:
binder:
brokers: localhost:9092
schemaRegistryClient:
endpoint: http://localhost:8990
kafka:
consumer:
group-id: notification-group
auto-offset-reset: earliest
kafka:
bootstrap:
servers: localhost:9092
The application seems to ignore the notification-service-events listener. It works when listening to only one stream.
I'm almost 100% sure that this is not an issue with publishing the event, because I've connected manually to Kafka and verified that messages are published properly:
kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic notification-service-events-out --from-beginning
Do you have any ideas what else I should check? Is there any additional configuration on the Spring side?
java spring apache-kafka spring-kafka
java spring apache-kafka spring-kafka
edited Nov 26 '18 at 17:11
Matthias J. Sax
31k45582
31k45582
asked Nov 25 '18 at 14:03
mateuszmateusz
901111
901111
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
I've found where the problem was.
I was missing bindings configuration. In the application properties, I should have added the following lines:
cloud:
stream:
bindings:
notification-service-events-in:
destination: notification-service-events
notification-service-events-out:
destination: notification-service-events
user-management-events-in:
destination: user-management-events
In the user-management-service I didn't have such a problem because I used a different property:
cloud:
stream:
default:
contentType: application/*+avro
destination: user-management-events
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53468278%2flistening-to-many-kafka-streams-in-spring%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
I've found where the problem was.
I was missing bindings configuration. In the application properties, I should have added the following lines:
cloud:
stream:
bindings:
notification-service-events-in:
destination: notification-service-events
notification-service-events-out:
destination: notification-service-events
user-management-events-in:
destination: user-management-events
In the user-management-service I didn't have such a problem because I used a different property:
cloud:
stream:
default:
contentType: application/*+avro
destination: user-management-events
add a comment |
I've found where the problem was.
I was missing bindings configuration. In the application properties, I should have added the following lines:
cloud:
stream:
bindings:
notification-service-events-in:
destination: notification-service-events
notification-service-events-out:
destination: notification-service-events
user-management-events-in:
destination: user-management-events
In the user-management-service I didn't have such a problem because I used a different property:
cloud:
stream:
default:
contentType: application/*+avro
destination: user-management-events
add a comment |
I've found where the problem was.
I was missing bindings configuration. In the application properties, I should have added the following lines:
cloud:
stream:
bindings:
notification-service-events-in:
destination: notification-service-events
notification-service-events-out:
destination: notification-service-events
user-management-events-in:
destination: user-management-events
In the user-management-service I didn't have such a problem because I used a different property:
cloud:
stream:
default:
contentType: application/*+avro
destination: user-management-events
I've found where the problem was.
I was missing bindings configuration. In the application properties, I should have added the following lines:
cloud:
stream:
bindings:
notification-service-events-in:
destination: notification-service-events
notification-service-events-out:
destination: notification-service-events
user-management-events-in:
destination: user-management-events
In the user-management-service I didn't have such a problem because I used a different property:
cloud:
stream:
default:
contentType: application/*+avro
destination: user-management-events
answered Nov 28 '18 at 19:36
mateuszmateusz
901111
901111
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53468278%2flistening-to-many-kafka-streams-in-spring%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown