Kafka Streams - Implementing Joining Using the Processor Api





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







1















I know joins can be performed using the dsl api. We need to use the processor api for various reasons.



How would one implement joining to streams using the processor api. Some ideas I had but dont think they are right.




  1. One processor with multiple source topics. Base Object for the process interface and then cast to correct type inside process method.


  2. Two Processors each with their own source topic. Each processor gets read only access to the other processors state store (if that is possible).



Any ideas - I did find the join implementation in KStreamImpl but am having trouble following. Perhaps an explantation on how the dsl does it?










share|improve this question































    1















    I know joins can be performed using the dsl api. We need to use the processor api for various reasons.



    How would one implement joining to streams using the processor api. Some ideas I had but dont think they are right.




    1. One processor with multiple source topics. Base Object for the process interface and then cast to correct type inside process method.


    2. Two Processors each with their own source topic. Each processor gets read only access to the other processors state store (if that is possible).



    Any ideas - I did find the join implementation in KStreamImpl but am having trouble following. Perhaps an explantation on how the dsl does it?










    share|improve this question



























      1












      1








      1


      2






      I know joins can be performed using the dsl api. We need to use the processor api for various reasons.



      How would one implement joining to streams using the processor api. Some ideas I had but dont think they are right.




      1. One processor with multiple source topics. Base Object for the process interface and then cast to correct type inside process method.


      2. Two Processors each with their own source topic. Each processor gets read only access to the other processors state store (if that is possible).



      Any ideas - I did find the join implementation in KStreamImpl but am having trouble following. Perhaps an explantation on how the dsl does it?










      share|improve this question
















      I know joins can be performed using the dsl api. We need to use the processor api for various reasons.



      How would one implement joining to streams using the processor api. Some ideas I had but dont think they are right.




      1. One processor with multiple source topics. Base Object for the process interface and then cast to correct type inside process method.


      2. Two Processors each with their own source topic. Each processor gets read only access to the other processors state store (if that is possible).



      Any ideas - I did find the join implementation in KStreamImpl but am having trouble following. Perhaps an explantation on how the dsl does it?







      apache-kafka-streams






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 26 '18 at 16:55







      Chris

















      asked Nov 26 '18 at 16:49









      ChrisChris

      3743419




      3743419
























          1 Answer
          1






          active

          oldest

          votes


















          2














          Both implementations you suggest are possible. Kafka Stream itself uses 5 processor to implement stream-stream join:



          source1 ---> "state maintainer 1" --> "joiner 1" ----+
          | | |
          updates "join lookups" |
          | | +-----+
          | +------+ |
          v | v
          "state 1" <------|------+ "merger" -->
          | | ^
          "state 2" <------+ | |
          ^ | +-----+
          | | |
          updates "join lookups" |
          | | |
          source2 ---> "state maintainer 2" --> "joiner 2" ----+


          Left and right pipeline are symmetric. Both have a "state maintainer" and "joiner" Processor. "State maintainer" has write access to the state. "Joiner" as read access to the other state. As a last step, both join result streams are merged together.






          share|improve this answer


























          • Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?

            – Chris
            Nov 26 '18 at 17:16






          • 1





            That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.

            – Matthias J. Sax
            Nov 26 '18 at 17:19











          • not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.

            – Chris
            Nov 26 '18 at 18:48











          • You always get write access... Just don't write :)

            – Matthias J. Sax
            Nov 26 '18 at 22:57











          • Fair enough :-)

            – Chris
            Nov 27 '18 at 2:00












          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53485632%2fkafka-streams-implementing-joining-using-the-processor-api%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          2














          Both implementations you suggest are possible. Kafka Stream itself uses 5 processor to implement stream-stream join:



          source1 ---> "state maintainer 1" --> "joiner 1" ----+
          | | |
          updates "join lookups" |
          | | +-----+
          | +------+ |
          v | v
          "state 1" <------|------+ "merger" -->
          | | ^
          "state 2" <------+ | |
          ^ | +-----+
          | | |
          updates "join lookups" |
          | | |
          source2 ---> "state maintainer 2" --> "joiner 2" ----+


          Left and right pipeline are symmetric. Both have a "state maintainer" and "joiner" Processor. "State maintainer" has write access to the state. "Joiner" as read access to the other state. As a last step, both join result streams are merged together.






          share|improve this answer


























          • Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?

            – Chris
            Nov 26 '18 at 17:16






          • 1





            That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.

            – Matthias J. Sax
            Nov 26 '18 at 17:19











          • not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.

            – Chris
            Nov 26 '18 at 18:48











          • You always get write access... Just don't write :)

            – Matthias J. Sax
            Nov 26 '18 at 22:57











          • Fair enough :-)

            – Chris
            Nov 27 '18 at 2:00
















          2














          Both implementations you suggest are possible. Kafka Stream itself uses 5 processor to implement stream-stream join:



          source1 ---> "state maintainer 1" --> "joiner 1" ----+
          | | |
          updates "join lookups" |
          | | +-----+
          | +------+ |
          v | v
          "state 1" <------|------+ "merger" -->
          | | ^
          "state 2" <------+ | |
          ^ | +-----+
          | | |
          updates "join lookups" |
          | | |
          source2 ---> "state maintainer 2" --> "joiner 2" ----+


          Left and right pipeline are symmetric. Both have a "state maintainer" and "joiner" Processor. "State maintainer" has write access to the state. "Joiner" as read access to the other state. As a last step, both join result streams are merged together.






          share|improve this answer


























          • Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?

            – Chris
            Nov 26 '18 at 17:16






          • 1





            That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.

            – Matthias J. Sax
            Nov 26 '18 at 17:19











          • not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.

            – Chris
            Nov 26 '18 at 18:48











          • You always get write access... Just don't write :)

            – Matthias J. Sax
            Nov 26 '18 at 22:57











          • Fair enough :-)

            – Chris
            Nov 27 '18 at 2:00














          2












          2








          2







          Both implementations you suggest are possible. Kafka Stream itself uses 5 processor to implement stream-stream join:



          source1 ---> "state maintainer 1" --> "joiner 1" ----+
          | | |
          updates "join lookups" |
          | | +-----+
          | +------+ |
          v | v
          "state 1" <------|------+ "merger" -->
          | | ^
          "state 2" <------+ | |
          ^ | +-----+
          | | |
          updates "join lookups" |
          | | |
          source2 ---> "state maintainer 2" --> "joiner 2" ----+


          Left and right pipeline are symmetric. Both have a "state maintainer" and "joiner" Processor. "State maintainer" has write access to the state. "Joiner" as read access to the other state. As a last step, both join result streams are merged together.






          share|improve this answer















          Both implementations you suggest are possible. Kafka Stream itself uses 5 processor to implement stream-stream join:



          source1 ---> "state maintainer 1" --> "joiner 1" ----+
          | | |
          updates "join lookups" |
          | | +-----+
          | +------+ |
          v | v
          "state 1" <------|------+ "merger" -->
          | | ^
          "state 2" <------+ | |
          ^ | +-----+
          | | |
          updates "join lookups" |
          | | |
          source2 ---> "state maintainer 2" --> "joiner 2" ----+


          Left and right pipeline are symmetric. Both have a "state maintainer" and "joiner" Processor. "State maintainer" has write access to the state. "Joiner" as read access to the other state. As a last step, both join result streams are merged together.







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 26 '18 at 17:19

























          answered Nov 26 '18 at 17:06









          Matthias J. SaxMatthias J. Sax

          31.7k45583




          31.7k45583













          • Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?

            – Chris
            Nov 26 '18 at 17:16






          • 1





            That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.

            – Matthias J. Sax
            Nov 26 '18 at 17:19











          • not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.

            – Chris
            Nov 26 '18 at 18:48











          • You always get write access... Just don't write :)

            – Matthias J. Sax
            Nov 26 '18 at 22:57











          • Fair enough :-)

            – Chris
            Nov 27 '18 at 2:00



















          • Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?

            – Chris
            Nov 26 '18 at 17:16






          • 1





            That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.

            – Matthias J. Sax
            Nov 26 '18 at 17:19











          • not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.

            – Chris
            Nov 26 '18 at 18:48











          • You always get write access... Just don't write :)

            – Matthias J. Sax
            Nov 26 '18 at 22:57











          • Fair enough :-)

            – Chris
            Nov 27 '18 at 2:00

















          Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?

          – Chris
          Nov 26 '18 at 17:16





          Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?

          – Chris
          Nov 26 '18 at 17:16




          1




          1





          That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.

          – Matthias J. Sax
          Nov 26 '18 at 17:19





          That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.

          – Matthias J. Sax
          Nov 26 '18 at 17:19













          not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.

          – Chris
          Nov 26 '18 at 18:48





          not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.

          – Chris
          Nov 26 '18 at 18:48













          You always get write access... Just don't write :)

          – Matthias J. Sax
          Nov 26 '18 at 22:57





          You always get write access... Just don't write :)

          – Matthias J. Sax
          Nov 26 '18 at 22:57













          Fair enough :-)

          – Chris
          Nov 27 '18 at 2:00





          Fair enough :-)

          – Chris
          Nov 27 '18 at 2:00




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53485632%2fkafka-streams-implementing-joining-using-the-processor-api%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Tonle Sap (See)

          I get strange results when I access the Sqlitedatabase with Unity C# via XAMPP

          Guatemaltekische Davis-Cup-Mannschaft