Kafka Streams - Implementing Joining Using the Processor Api
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}
I know joins can be performed using the dsl api. We need to use the processor api for various reasons.
How would one implement joining to streams using the processor api. Some ideas I had but dont think they are right.
One processor with multiple source topics. Base Object for the process interface and then cast to correct type inside process method.
Two Processors each with their own source topic. Each processor gets read only access to the other processors state store (if that is possible).
Any ideas - I did find the join implementation in KStreamImpl but am having trouble following. Perhaps an explantation on how the dsl does it?
apache-kafka-streams
add a comment |
I know joins can be performed using the dsl api. We need to use the processor api for various reasons.
How would one implement joining to streams using the processor api. Some ideas I had but dont think they are right.
One processor with multiple source topics. Base Object for the process interface and then cast to correct type inside process method.
Two Processors each with their own source topic. Each processor gets read only access to the other processors state store (if that is possible).
Any ideas - I did find the join implementation in KStreamImpl but am having trouble following. Perhaps an explantation on how the dsl does it?
apache-kafka-streams
add a comment |
I know joins can be performed using the dsl api. We need to use the processor api for various reasons.
How would one implement joining to streams using the processor api. Some ideas I had but dont think they are right.
One processor with multiple source topics. Base Object for the process interface and then cast to correct type inside process method.
Two Processors each with their own source topic. Each processor gets read only access to the other processors state store (if that is possible).
Any ideas - I did find the join implementation in KStreamImpl but am having trouble following. Perhaps an explantation on how the dsl does it?
apache-kafka-streams
I know joins can be performed using the dsl api. We need to use the processor api for various reasons.
How would one implement joining to streams using the processor api. Some ideas I had but dont think they are right.
One processor with multiple source topics. Base Object for the process interface and then cast to correct type inside process method.
Two Processors each with their own source topic. Each processor gets read only access to the other processors state store (if that is possible).
Any ideas - I did find the join implementation in KStreamImpl but am having trouble following. Perhaps an explantation on how the dsl does it?
apache-kafka-streams
apache-kafka-streams
edited Nov 26 '18 at 16:55
Chris
asked Nov 26 '18 at 16:49
ChrisChris
3743419
3743419
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
Both implementations you suggest are possible. Kafka Stream itself uses 5 processor to implement stream-stream join:
source1 ---> "state maintainer 1" --> "joiner 1" ----+
| | |
updates "join lookups" |
| | +-----+
| +------+ |
v | v
"state 1" <------|------+ "merger" -->
| | ^
"state 2" <------+ | |
^ | +-----+
| | |
updates "join lookups" |
| | |
source2 ---> "state maintainer 2" --> "joiner 2" ----+
Left and right pipeline are symmetric. Both have a "state maintainer" and "joiner" Processor. "State maintainer" has write access to the state. "Joiner" as read access to the other state. As a last step, both join result streams are merged together.
Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?
– Chris
Nov 26 '18 at 17:16
1
That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.
– Matthias J. Sax
Nov 26 '18 at 17:19
not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.
– Chris
Nov 26 '18 at 18:48
You always get write access... Just don't write :)
– Matthias J. Sax
Nov 26 '18 at 22:57
Fair enough :-)
– Chris
Nov 27 '18 at 2:00
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53485632%2fkafka-streams-implementing-joining-using-the-processor-api%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Both implementations you suggest are possible. Kafka Stream itself uses 5 processor to implement stream-stream join:
source1 ---> "state maintainer 1" --> "joiner 1" ----+
| | |
updates "join lookups" |
| | +-----+
| +------+ |
v | v
"state 1" <------|------+ "merger" -->
| | ^
"state 2" <------+ | |
^ | +-----+
| | |
updates "join lookups" |
| | |
source2 ---> "state maintainer 2" --> "joiner 2" ----+
Left and right pipeline are symmetric. Both have a "state maintainer" and "joiner" Processor. "State maintainer" has write access to the state. "Joiner" as read access to the other state. As a last step, both join result streams are merged together.
Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?
– Chris
Nov 26 '18 at 17:16
1
That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.
– Matthias J. Sax
Nov 26 '18 at 17:19
not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.
– Chris
Nov 26 '18 at 18:48
You always get write access... Just don't write :)
– Matthias J. Sax
Nov 26 '18 at 22:57
Fair enough :-)
– Chris
Nov 27 '18 at 2:00
add a comment |
Both implementations you suggest are possible. Kafka Stream itself uses 5 processor to implement stream-stream join:
source1 ---> "state maintainer 1" --> "joiner 1" ----+
| | |
updates "join lookups" |
| | +-----+
| +------+ |
v | v
"state 1" <------|------+ "merger" -->
| | ^
"state 2" <------+ | |
^ | +-----+
| | |
updates "join lookups" |
| | |
source2 ---> "state maintainer 2" --> "joiner 2" ----+
Left and right pipeline are symmetric. Both have a "state maintainer" and "joiner" Processor. "State maintainer" has write access to the state. "Joiner" as read access to the other state. As a last step, both join result streams are merged together.
Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?
– Chris
Nov 26 '18 at 17:16
1
That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.
– Matthias J. Sax
Nov 26 '18 at 17:19
not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.
– Chris
Nov 26 '18 at 18:48
You always get write access... Just don't write :)
– Matthias J. Sax
Nov 26 '18 at 22:57
Fair enough :-)
– Chris
Nov 27 '18 at 2:00
add a comment |
Both implementations you suggest are possible. Kafka Stream itself uses 5 processor to implement stream-stream join:
source1 ---> "state maintainer 1" --> "joiner 1" ----+
| | |
updates "join lookups" |
| | +-----+
| +------+ |
v | v
"state 1" <------|------+ "merger" -->
| | ^
"state 2" <------+ | |
^ | +-----+
| | |
updates "join lookups" |
| | |
source2 ---> "state maintainer 2" --> "joiner 2" ----+
Left and right pipeline are symmetric. Both have a "state maintainer" and "joiner" Processor. "State maintainer" has write access to the state. "Joiner" as read access to the other state. As a last step, both join result streams are merged together.
Both implementations you suggest are possible. Kafka Stream itself uses 5 processor to implement stream-stream join:
source1 ---> "state maintainer 1" --> "joiner 1" ----+
| | |
updates "join lookups" |
| | +-----+
| +------+ |
v | v
"state 1" <------|------+ "merger" -->
| | ^
"state 2" <------+ | |
^ | +-----+
| | |
updates "join lookups" |
| | |
source2 ---> "state maintainer 2" --> "joiner 2" ----+
Left and right pipeline are symmetric. Both have a "state maintainer" and "joiner" Processor. "State maintainer" has write access to the state. "Joiner" as read access to the other state. As a last step, both join result streams are merged together.
edited Nov 26 '18 at 17:19
answered Nov 26 '18 at 17:06
Matthias J. SaxMatthias J. Sax
31.7k45583
31.7k45583
Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?
– Chris
Nov 26 '18 at 17:16
1
That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.
– Matthias J. Sax
Nov 26 '18 at 17:19
not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.
– Chris
Nov 26 '18 at 18:48
You always get write access... Just don't write :)
– Matthias J. Sax
Nov 26 '18 at 22:57
Fair enough :-)
– Chris
Nov 27 '18 at 2:00
add a comment |
Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?
– Chris
Nov 26 '18 at 17:16
1
That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.
– Matthias J. Sax
Nov 26 '18 at 17:19
not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.
– Chris
Nov 26 '18 at 18:48
You always get write access... Just don't write :)
– Matthias J. Sax
Nov 26 '18 at 22:57
Fair enough :-)
– Chris
Nov 27 '18 at 2:00
Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?
– Chris
Nov 26 '18 at 17:16
Awesome information! I assume joiner 1 is able to get the proper partition of state store 2 since we are adding the processors and state stores to the same topology?
– Chris
Nov 26 '18 at 17:16
1
1
That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.
– Matthias J. Sax
Nov 26 '18 at 17:19
That happens automatically. If there is a task, it get all partitions with the same number assigned over all the input topics, ie, for topics "A" and "B", task-0 gets partitions A-0 and B-0 assigned (and so on). You get one task per partition. To make the join work, the number of partitions must be the same for both input topics obviously (and partitioned the same way) to ensure data co-location within the same task.
– Matthias J. Sax
Nov 26 '18 at 17:19
not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.
– Chris
Nov 26 '18 at 18:48
not sure if it is possible. But looks like you cannot get a read only version of the state store from the processor context. Is the possible? Or do you just need to be careful about your processor implementation.
– Chris
Nov 26 '18 at 18:48
You always get write access... Just don't write :)
– Matthias J. Sax
Nov 26 '18 at 22:57
You always get write access... Just don't write :)
– Matthias J. Sax
Nov 26 '18 at 22:57
Fair enough :-)
– Chris
Nov 27 '18 at 2:00
Fair enough :-)
– Chris
Nov 27 '18 at 2:00
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53485632%2fkafka-streams-implementing-joining-using-the-processor-api%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown