Listening to many Kafka Streams in Spring












0















I'm developing an application in the event-driven architecture.



I'm trying to model the following flow of events:



UserAccountCreated (user-management-events) -> sending an e-mail -> MailNotificationSent (notification-service-events)



The notification-service application executes the whole flow. It waits for the UserAccountCreated event by listening to user-management-events topic. When the event is received, the application sends the email and publishes a new event - MailNotificationSent to the notification-service-events topic.



I have no problems with listening to the first event (UserAccountCreated) - application receives it and performs the rest of the flow. I also have no problem with publishing the MailNotificationSent event. Unfortunately, for development purposes, I want to listen to the MailNotificationSent event in the notification service, so the application has to listen to both UserAccountCreated and MailNotificationSent. Here I'm not able to make it works.



Let's take a look at the implementation:



NotificationStreams:



public interface NotificationStreams {

String INPUT = "notification-service-events-in";
String OUTPUT = "notification-service-events-out";

@Input(INPUT)
SubscribableChannel inboundEvents();

@Output(OUTPUT)
MessageChannel outboundEvents();
}


NotificationsEventsListener:



@Slf4j
@Component
@RequiredArgsConstructor
public class NotificationEventsListener {


@StreamListener(NotificationStreams.INPUT)
public void notificationServiceEventsIn(Flux<ActivationLinkSent> input) {
input.subscribe(event -> {
log.info("Received event ActivationLinkSent: " + event.toString());
});
}
}


UserManagementEvents:



public interface UserManagementEvents {

String INPUT = "user-management-events";

@Input(INPUT)
SubscribableChannel inboundEvents();
}


UserManagementEventsListener:



@Slf4j
@Component
@RequiredArgsConstructor
public class UserManagementEventsListener {

private final Gate gate;

@StreamListener(UserManagementEvents.INPUT)
public void userManagementEvents(Flux<UserAccountCreated> input) {
input.subscribe(event -> {
log.info("Received event UserAccountCreated: " + event.toString());
gate.dispatch(SendActivationLink.builder()
.email(event.getEmail())
.username(event.getUsername())
.build()
);
});
}
}


KafkaStreamsConfig:



@EnableBinding(value = {NotificationStreams.class, UserManagementEvents.class})
public class KafkaStreamsConfig {
}


EventPublisher:



@Slf4j
@RequiredArgsConstructor
@Component
public class EventPublisher {

private final NotificationStreams eventsStreams;

private final AvroMessageBuilder messageBuilder;

public void publish(Event event) {
MessageChannel messageChannel = eventsStreams.outboundEvents();

AvroActivationLinkSent activationLinkSent = new AvroActivationLinkSent(); activationLinkSent.setEmail(((ActivationLinkSent)event).getEmail());
activationLinkSent.setUsername(((ActivationLinkSent)event).getUsername() + "-domain");
activationLinkSent.setTimestamp(System.currentTimeMillis());
messageChannel.send(messageBuilder.buildMessage(activationLinkSent));
}
}


application config:



spring:
devtools:
restart:
enabled: true
cloud:
stream:
default:
contentType: application/*+avro
kafka:
binder:
brokers: localhost:9092
schemaRegistryClient:
endpoint: http://localhost:8990
kafka:
consumer:
group-id: notification-group
auto-offset-reset: earliest
kafka:
bootstrap:
servers: localhost:9092


The application seems to ignore the notification-service-events listener. It works when listening to only one stream.



I'm almost 100% sure that this is not an issue with publishing the event, because I've connected manually to Kafka and verified that messages are published properly:



kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic notification-service-events-out --from-beginning


Do you have any ideas what else I should check? Is there any additional configuration on the Spring side?










share|improve this question





























    0















    I'm developing an application in the event-driven architecture.



    I'm trying to model the following flow of events:



    UserAccountCreated (user-management-events) -> sending an e-mail -> MailNotificationSent (notification-service-events)



    The notification-service application executes the whole flow. It waits for the UserAccountCreated event by listening to user-management-events topic. When the event is received, the application sends the email and publishes a new event - MailNotificationSent to the notification-service-events topic.



    I have no problems with listening to the first event (UserAccountCreated) - application receives it and performs the rest of the flow. I also have no problem with publishing the MailNotificationSent event. Unfortunately, for development purposes, I want to listen to the MailNotificationSent event in the notification service, so the application has to listen to both UserAccountCreated and MailNotificationSent. Here I'm not able to make it works.



    Let's take a look at the implementation:



    NotificationStreams:



    public interface NotificationStreams {

    String INPUT = "notification-service-events-in";
    String OUTPUT = "notification-service-events-out";

    @Input(INPUT)
    SubscribableChannel inboundEvents();

    @Output(OUTPUT)
    MessageChannel outboundEvents();
    }


    NotificationsEventsListener:



    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class NotificationEventsListener {


    @StreamListener(NotificationStreams.INPUT)
    public void notificationServiceEventsIn(Flux<ActivationLinkSent> input) {
    input.subscribe(event -> {
    log.info("Received event ActivationLinkSent: " + event.toString());
    });
    }
    }


    UserManagementEvents:



    public interface UserManagementEvents {

    String INPUT = "user-management-events";

    @Input(INPUT)
    SubscribableChannel inboundEvents();
    }


    UserManagementEventsListener:



    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class UserManagementEventsListener {

    private final Gate gate;

    @StreamListener(UserManagementEvents.INPUT)
    public void userManagementEvents(Flux<UserAccountCreated> input) {
    input.subscribe(event -> {
    log.info("Received event UserAccountCreated: " + event.toString());
    gate.dispatch(SendActivationLink.builder()
    .email(event.getEmail())
    .username(event.getUsername())
    .build()
    );
    });
    }
    }


    KafkaStreamsConfig:



    @EnableBinding(value = {NotificationStreams.class, UserManagementEvents.class})
    public class KafkaStreamsConfig {
    }


    EventPublisher:



    @Slf4j
    @RequiredArgsConstructor
    @Component
    public class EventPublisher {

    private final NotificationStreams eventsStreams;

    private final AvroMessageBuilder messageBuilder;

    public void publish(Event event) {
    MessageChannel messageChannel = eventsStreams.outboundEvents();

    AvroActivationLinkSent activationLinkSent = new AvroActivationLinkSent(); activationLinkSent.setEmail(((ActivationLinkSent)event).getEmail());
    activationLinkSent.setUsername(((ActivationLinkSent)event).getUsername() + "-domain");
    activationLinkSent.setTimestamp(System.currentTimeMillis());
    messageChannel.send(messageBuilder.buildMessage(activationLinkSent));
    }
    }


    application config:



    spring:
    devtools:
    restart:
    enabled: true
    cloud:
    stream:
    default:
    contentType: application/*+avro
    kafka:
    binder:
    brokers: localhost:9092
    schemaRegistryClient:
    endpoint: http://localhost:8990
    kafka:
    consumer:
    group-id: notification-group
    auto-offset-reset: earliest
    kafka:
    bootstrap:
    servers: localhost:9092


    The application seems to ignore the notification-service-events listener. It works when listening to only one stream.



    I'm almost 100% sure that this is not an issue with publishing the event, because I've connected manually to Kafka and verified that messages are published properly:



    kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic notification-service-events-out --from-beginning


    Do you have any ideas what else I should check? Is there any additional configuration on the Spring side?










    share|improve this question



























      0












      0








      0








      I'm developing an application in the event-driven architecture.



      I'm trying to model the following flow of events:



      UserAccountCreated (user-management-events) -> sending an e-mail -> MailNotificationSent (notification-service-events)



      The notification-service application executes the whole flow. It waits for the UserAccountCreated event by listening to user-management-events topic. When the event is received, the application sends the email and publishes a new event - MailNotificationSent to the notification-service-events topic.



      I have no problems with listening to the first event (UserAccountCreated) - application receives it and performs the rest of the flow. I also have no problem with publishing the MailNotificationSent event. Unfortunately, for development purposes, I want to listen to the MailNotificationSent event in the notification service, so the application has to listen to both UserAccountCreated and MailNotificationSent. Here I'm not able to make it works.



      Let's take a look at the implementation:



      NotificationStreams:



      public interface NotificationStreams {

      String INPUT = "notification-service-events-in";
      String OUTPUT = "notification-service-events-out";

      @Input(INPUT)
      SubscribableChannel inboundEvents();

      @Output(OUTPUT)
      MessageChannel outboundEvents();
      }


      NotificationsEventsListener:



      @Slf4j
      @Component
      @RequiredArgsConstructor
      public class NotificationEventsListener {


      @StreamListener(NotificationStreams.INPUT)
      public void notificationServiceEventsIn(Flux<ActivationLinkSent> input) {
      input.subscribe(event -> {
      log.info("Received event ActivationLinkSent: " + event.toString());
      });
      }
      }


      UserManagementEvents:



      public interface UserManagementEvents {

      String INPUT = "user-management-events";

      @Input(INPUT)
      SubscribableChannel inboundEvents();
      }


      UserManagementEventsListener:



      @Slf4j
      @Component
      @RequiredArgsConstructor
      public class UserManagementEventsListener {

      private final Gate gate;

      @StreamListener(UserManagementEvents.INPUT)
      public void userManagementEvents(Flux<UserAccountCreated> input) {
      input.subscribe(event -> {
      log.info("Received event UserAccountCreated: " + event.toString());
      gate.dispatch(SendActivationLink.builder()
      .email(event.getEmail())
      .username(event.getUsername())
      .build()
      );
      });
      }
      }


      KafkaStreamsConfig:



      @EnableBinding(value = {NotificationStreams.class, UserManagementEvents.class})
      public class KafkaStreamsConfig {
      }


      EventPublisher:



      @Slf4j
      @RequiredArgsConstructor
      @Component
      public class EventPublisher {

      private final NotificationStreams eventsStreams;

      private final AvroMessageBuilder messageBuilder;

      public void publish(Event event) {
      MessageChannel messageChannel = eventsStreams.outboundEvents();

      AvroActivationLinkSent activationLinkSent = new AvroActivationLinkSent(); activationLinkSent.setEmail(((ActivationLinkSent)event).getEmail());
      activationLinkSent.setUsername(((ActivationLinkSent)event).getUsername() + "-domain");
      activationLinkSent.setTimestamp(System.currentTimeMillis());
      messageChannel.send(messageBuilder.buildMessage(activationLinkSent));
      }
      }


      application config:



      spring:
      devtools:
      restart:
      enabled: true
      cloud:
      stream:
      default:
      contentType: application/*+avro
      kafka:
      binder:
      brokers: localhost:9092
      schemaRegistryClient:
      endpoint: http://localhost:8990
      kafka:
      consumer:
      group-id: notification-group
      auto-offset-reset: earliest
      kafka:
      bootstrap:
      servers: localhost:9092


      The application seems to ignore the notification-service-events listener. It works when listening to only one stream.



      I'm almost 100% sure that this is not an issue with publishing the event, because I've connected manually to Kafka and verified that messages are published properly:



      kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic notification-service-events-out --from-beginning


      Do you have any ideas what else I should check? Is there any additional configuration on the Spring side?










      share|improve this question
















      I'm developing an application in the event-driven architecture.



      I'm trying to model the following flow of events:



      UserAccountCreated (user-management-events) -> sending an e-mail -> MailNotificationSent (notification-service-events)



      The notification-service application executes the whole flow. It waits for the UserAccountCreated event by listening to user-management-events topic. When the event is received, the application sends the email and publishes a new event - MailNotificationSent to the notification-service-events topic.



      I have no problems with listening to the first event (UserAccountCreated) - application receives it and performs the rest of the flow. I also have no problem with publishing the MailNotificationSent event. Unfortunately, for development purposes, I want to listen to the MailNotificationSent event in the notification service, so the application has to listen to both UserAccountCreated and MailNotificationSent. Here I'm not able to make it works.



      Let's take a look at the implementation:



      NotificationStreams:



      public interface NotificationStreams {

      String INPUT = "notification-service-events-in";
      String OUTPUT = "notification-service-events-out";

      @Input(INPUT)
      SubscribableChannel inboundEvents();

      @Output(OUTPUT)
      MessageChannel outboundEvents();
      }


      NotificationsEventsListener:



      @Slf4j
      @Component
      @RequiredArgsConstructor
      public class NotificationEventsListener {


      @StreamListener(NotificationStreams.INPUT)
      public void notificationServiceEventsIn(Flux<ActivationLinkSent> input) {
      input.subscribe(event -> {
      log.info("Received event ActivationLinkSent: " + event.toString());
      });
      }
      }


      UserManagementEvents:



      public interface UserManagementEvents {

      String INPUT = "user-management-events";

      @Input(INPUT)
      SubscribableChannel inboundEvents();
      }


      UserManagementEventsListener:



      @Slf4j
      @Component
      @RequiredArgsConstructor
      public class UserManagementEventsListener {

      private final Gate gate;

      @StreamListener(UserManagementEvents.INPUT)
      public void userManagementEvents(Flux<UserAccountCreated> input) {
      input.subscribe(event -> {
      log.info("Received event UserAccountCreated: " + event.toString());
      gate.dispatch(SendActivationLink.builder()
      .email(event.getEmail())
      .username(event.getUsername())
      .build()
      );
      });
      }
      }


      KafkaStreamsConfig:



      @EnableBinding(value = {NotificationStreams.class, UserManagementEvents.class})
      public class KafkaStreamsConfig {
      }


      EventPublisher:



      @Slf4j
      @RequiredArgsConstructor
      @Component
      public class EventPublisher {

      private final NotificationStreams eventsStreams;

      private final AvroMessageBuilder messageBuilder;

      public void publish(Event event) {
      MessageChannel messageChannel = eventsStreams.outboundEvents();

      AvroActivationLinkSent activationLinkSent = new AvroActivationLinkSent(); activationLinkSent.setEmail(((ActivationLinkSent)event).getEmail());
      activationLinkSent.setUsername(((ActivationLinkSent)event).getUsername() + "-domain");
      activationLinkSent.setTimestamp(System.currentTimeMillis());
      messageChannel.send(messageBuilder.buildMessage(activationLinkSent));
      }
      }


      application config:



      spring:
      devtools:
      restart:
      enabled: true
      cloud:
      stream:
      default:
      contentType: application/*+avro
      kafka:
      binder:
      brokers: localhost:9092
      schemaRegistryClient:
      endpoint: http://localhost:8990
      kafka:
      consumer:
      group-id: notification-group
      auto-offset-reset: earliest
      kafka:
      bootstrap:
      servers: localhost:9092


      The application seems to ignore the notification-service-events listener. It works when listening to only one stream.



      I'm almost 100% sure that this is not an issue with publishing the event, because I've connected manually to Kafka and verified that messages are published properly:



      kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic notification-service-events-out --from-beginning


      Do you have any ideas what else I should check? Is there any additional configuration on the Spring side?







      java spring apache-kafka spring-kafka






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 26 '18 at 17:11









      Matthias J. Sax

      31k45582




      31k45582










      asked Nov 25 '18 at 14:03









      mateuszmateusz

      901111




      901111
























          1 Answer
          1






          active

          oldest

          votes


















          0














          I've found where the problem was.
          I was missing bindings configuration. In the application properties, I should have added the following lines:



          cloud:
          stream:
          bindings:
          notification-service-events-in:
          destination: notification-service-events
          notification-service-events-out:
          destination: notification-service-events
          user-management-events-in:
          destination: user-management-events


          In the user-management-service I didn't have such a problem because I used a different property:



            cloud:
          stream:
          default:
          contentType: application/*+avro
          destination: user-management-events





          share|improve this answer























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53468278%2flistening-to-many-kafka-streams-in-spring%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0














            I've found where the problem was.
            I was missing bindings configuration. In the application properties, I should have added the following lines:



            cloud:
            stream:
            bindings:
            notification-service-events-in:
            destination: notification-service-events
            notification-service-events-out:
            destination: notification-service-events
            user-management-events-in:
            destination: user-management-events


            In the user-management-service I didn't have such a problem because I used a different property:



              cloud:
            stream:
            default:
            contentType: application/*+avro
            destination: user-management-events





            share|improve this answer




























              0














              I've found where the problem was.
              I was missing bindings configuration. In the application properties, I should have added the following lines:



              cloud:
              stream:
              bindings:
              notification-service-events-in:
              destination: notification-service-events
              notification-service-events-out:
              destination: notification-service-events
              user-management-events-in:
              destination: user-management-events


              In the user-management-service I didn't have such a problem because I used a different property:



                cloud:
              stream:
              default:
              contentType: application/*+avro
              destination: user-management-events





              share|improve this answer


























                0












                0








                0







                I've found where the problem was.
                I was missing bindings configuration. In the application properties, I should have added the following lines:



                cloud:
                stream:
                bindings:
                notification-service-events-in:
                destination: notification-service-events
                notification-service-events-out:
                destination: notification-service-events
                user-management-events-in:
                destination: user-management-events


                In the user-management-service I didn't have such a problem because I used a different property:



                  cloud:
                stream:
                default:
                contentType: application/*+avro
                destination: user-management-events





                share|improve this answer













                I've found where the problem was.
                I was missing bindings configuration. In the application properties, I should have added the following lines:



                cloud:
                stream:
                bindings:
                notification-service-events-in:
                destination: notification-service-events
                notification-service-events-out:
                destination: notification-service-events
                user-management-events-in:
                destination: user-management-events


                In the user-management-service I didn't have such a problem because I used a different property:



                  cloud:
                stream:
                default:
                contentType: application/*+avro
                destination: user-management-events






                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 28 '18 at 19:36









                mateuszmateusz

                901111




                901111
































                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53468278%2flistening-to-many-kafka-streams-in-spring%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Wiesbaden

                    Marschland

                    Dieringhausen