How to join two Spark Structured Streams?
Is it possible to join two Spark Structured Streams in Spark 2.2.1? I found a lot of issues with doing very simple manipulations in Spark Structured Streaming. The documentation and number of examples seem very limited to me.
I have two sources of streaming data:
persons.json:
[
{"building_id": 70, "id": 21, "latitude": 41.20, "longitude": 2.2, "timestamp": 1532609003},
{"building_id": 70, "id": 15, "latitude": 41.24, "longitude": 2.3, "timestamp": 1532609005},
{"building_id": 71, "id": 11, "latitude": 41.28, "longitude": 2.1, "timestamp": 1532609005}
]
machines.json
[
{"building_id": 70, "mid": 222, "latitude": 42.1, "longitude": 2.11}
]
The goal is to get a merged DataFrame with latitude and longitude of persons and machines. I need it in order to estimate the distance between them in real time:
building_id id mid latitude longitude latitude_machine longitude_machine
70 21 222 41.20 2.2 42.1 2.11
# ...
If it's impossible to join two streams, then I would really appreciate some recommendation of a possible workaround.
Code:
spark = SparkSession
.builder
.appName("Test")
.master("local[2]")
.getOrCreate()
schema_persons = StructType([
StructField("building_id", IntegerType()),
StructField("id", IntegerType()),
StructField("latitude", DoubleType()),
StructField("longitude", DoubleType()),
StructField("timestamp", LongType())
])
schema_machines = StructType([
StructField("building_id", IntegerType()),
StructField("mid", IntegerType()),
StructField("latitude", DoubleType()),
StructField("longitude", DoubleType())
])
df_persons = spark
.readStream
.format("json")
.schema(schema_persons)
.load("data/persons")
df_machines = spark
.readStream
.format("json")
.schema(schema_machines)
.load("data/machines")
.withColumnRenamed("latitude", "latitude_machine")
.withColumnRenamed("longitude", "longitude_machine")
df_joined = df_persons
.join(df_machines, ["building_id"], "left")
query_persons = df_persons
.writeStream
.format('console')
.start()
query_machines = df_machines
.writeStream
.format('console')
.start()
query_persons.awaitTermination()
query_machines.awaitTermination()
python apache-spark pyspark spark-structured-streaming
add a comment |
Is it possible to join two Spark Structured Streams in Spark 2.2.1? I found a lot of issues with doing very simple manipulations in Spark Structured Streaming. The documentation and number of examples seem very limited to me.
I have two sources of streaming data:
persons.json:
[
{"building_id": 70, "id": 21, "latitude": 41.20, "longitude": 2.2, "timestamp": 1532609003},
{"building_id": 70, "id": 15, "latitude": 41.24, "longitude": 2.3, "timestamp": 1532609005},
{"building_id": 71, "id": 11, "latitude": 41.28, "longitude": 2.1, "timestamp": 1532609005}
]
machines.json
[
{"building_id": 70, "mid": 222, "latitude": 42.1, "longitude": 2.11}
]
The goal is to get a merged DataFrame with latitude and longitude of persons and machines. I need it in order to estimate the distance between them in real time:
building_id id mid latitude longitude latitude_machine longitude_machine
70 21 222 41.20 2.2 42.1 2.11
# ...
If it's impossible to join two streams, then I would really appreciate some recommendation of a possible workaround.
Code:
spark = SparkSession
.builder
.appName("Test")
.master("local[2]")
.getOrCreate()
schema_persons = StructType([
StructField("building_id", IntegerType()),
StructField("id", IntegerType()),
StructField("latitude", DoubleType()),
StructField("longitude", DoubleType()),
StructField("timestamp", LongType())
])
schema_machines = StructType([
StructField("building_id", IntegerType()),
StructField("mid", IntegerType()),
StructField("latitude", DoubleType()),
StructField("longitude", DoubleType())
])
df_persons = spark
.readStream
.format("json")
.schema(schema_persons)
.load("data/persons")
df_machines = spark
.readStream
.format("json")
.schema(schema_machines)
.load("data/machines")
.withColumnRenamed("latitude", "latitude_machine")
.withColumnRenamed("longitude", "longitude_machine")
df_joined = df_persons
.join(df_machines, ["building_id"], "left")
query_persons = df_persons
.writeStream
.format('console')
.start()
query_machines = df_machines
.writeStream
.format('console')
.start()
query_persons.awaitTermination()
query_machines.awaitTermination()
python apache-spark pyspark spark-structured-streaming
In general stream-to-stream joins are supported in the latest versions (2.3, 2.4), but require watermark at least at on side - see the join matrix. If you're looking for concrete examplesStreamingJoinSuite
would be the place to go. In 2.2 however, joins between streaming datasets are not supported;
– user6910411
Nov 23 '18 at 20:34
@user6910411: Thanks. But what is the workaround in Spark 2.2? Just nothing to do?
– Mozimaki
Nov 24 '18 at 10:43
Other than upgrade? Not really.
– user6910411
Nov 24 '18 at 21:29
add a comment |
Is it possible to join two Spark Structured Streams in Spark 2.2.1? I found a lot of issues with doing very simple manipulations in Spark Structured Streaming. The documentation and number of examples seem very limited to me.
I have two sources of streaming data:
persons.json:
[
{"building_id": 70, "id": 21, "latitude": 41.20, "longitude": 2.2, "timestamp": 1532609003},
{"building_id": 70, "id": 15, "latitude": 41.24, "longitude": 2.3, "timestamp": 1532609005},
{"building_id": 71, "id": 11, "latitude": 41.28, "longitude": 2.1, "timestamp": 1532609005}
]
machines.json
[
{"building_id": 70, "mid": 222, "latitude": 42.1, "longitude": 2.11}
]
The goal is to get a merged DataFrame with latitude and longitude of persons and machines. I need it in order to estimate the distance between them in real time:
building_id id mid latitude longitude latitude_machine longitude_machine
70 21 222 41.20 2.2 42.1 2.11
# ...
If it's impossible to join two streams, then I would really appreciate some recommendation of a possible workaround.
Code:
spark = SparkSession
.builder
.appName("Test")
.master("local[2]")
.getOrCreate()
schema_persons = StructType([
StructField("building_id", IntegerType()),
StructField("id", IntegerType()),
StructField("latitude", DoubleType()),
StructField("longitude", DoubleType()),
StructField("timestamp", LongType())
])
schema_machines = StructType([
StructField("building_id", IntegerType()),
StructField("mid", IntegerType()),
StructField("latitude", DoubleType()),
StructField("longitude", DoubleType())
])
df_persons = spark
.readStream
.format("json")
.schema(schema_persons)
.load("data/persons")
df_machines = spark
.readStream
.format("json")
.schema(schema_machines)
.load("data/machines")
.withColumnRenamed("latitude", "latitude_machine")
.withColumnRenamed("longitude", "longitude_machine")
df_joined = df_persons
.join(df_machines, ["building_id"], "left")
query_persons = df_persons
.writeStream
.format('console')
.start()
query_machines = df_machines
.writeStream
.format('console')
.start()
query_persons.awaitTermination()
query_machines.awaitTermination()
python apache-spark pyspark spark-structured-streaming
Is it possible to join two Spark Structured Streams in Spark 2.2.1? I found a lot of issues with doing very simple manipulations in Spark Structured Streaming. The documentation and number of examples seem very limited to me.
I have two sources of streaming data:
persons.json:
[
{"building_id": 70, "id": 21, "latitude": 41.20, "longitude": 2.2, "timestamp": 1532609003},
{"building_id": 70, "id": 15, "latitude": 41.24, "longitude": 2.3, "timestamp": 1532609005},
{"building_id": 71, "id": 11, "latitude": 41.28, "longitude": 2.1, "timestamp": 1532609005}
]
machines.json
[
{"building_id": 70, "mid": 222, "latitude": 42.1, "longitude": 2.11}
]
The goal is to get a merged DataFrame with latitude and longitude of persons and machines. I need it in order to estimate the distance between them in real time:
building_id id mid latitude longitude latitude_machine longitude_machine
70 21 222 41.20 2.2 42.1 2.11
# ...
If it's impossible to join two streams, then I would really appreciate some recommendation of a possible workaround.
Code:
spark = SparkSession
.builder
.appName("Test")
.master("local[2]")
.getOrCreate()
schema_persons = StructType([
StructField("building_id", IntegerType()),
StructField("id", IntegerType()),
StructField("latitude", DoubleType()),
StructField("longitude", DoubleType()),
StructField("timestamp", LongType())
])
schema_machines = StructType([
StructField("building_id", IntegerType()),
StructField("mid", IntegerType()),
StructField("latitude", DoubleType()),
StructField("longitude", DoubleType())
])
df_persons = spark
.readStream
.format("json")
.schema(schema_persons)
.load("data/persons")
df_machines = spark
.readStream
.format("json")
.schema(schema_machines)
.load("data/machines")
.withColumnRenamed("latitude", "latitude_machine")
.withColumnRenamed("longitude", "longitude_machine")
df_joined = df_persons
.join(df_machines, ["building_id"], "left")
query_persons = df_persons
.writeStream
.format('console')
.start()
query_machines = df_machines
.writeStream
.format('console')
.start()
query_persons.awaitTermination()
query_machines.awaitTermination()
python apache-spark pyspark spark-structured-streaming
python apache-spark pyspark spark-structured-streaming
edited Nov 23 '18 at 20:27
Mozimaki
asked Nov 23 '18 at 20:07
MozimakiMozimaki
474
474
In general stream-to-stream joins are supported in the latest versions (2.3, 2.4), but require watermark at least at on side - see the join matrix. If you're looking for concrete examplesStreamingJoinSuite
would be the place to go. In 2.2 however, joins between streaming datasets are not supported;
– user6910411
Nov 23 '18 at 20:34
@user6910411: Thanks. But what is the workaround in Spark 2.2? Just nothing to do?
– Mozimaki
Nov 24 '18 at 10:43
Other than upgrade? Not really.
– user6910411
Nov 24 '18 at 21:29
add a comment |
In general stream-to-stream joins are supported in the latest versions (2.3, 2.4), but require watermark at least at on side - see the join matrix. If you're looking for concrete examplesStreamingJoinSuite
would be the place to go. In 2.2 however, joins between streaming datasets are not supported;
– user6910411
Nov 23 '18 at 20:34
@user6910411: Thanks. But what is the workaround in Spark 2.2? Just nothing to do?
– Mozimaki
Nov 24 '18 at 10:43
Other than upgrade? Not really.
– user6910411
Nov 24 '18 at 21:29
In general stream-to-stream joins are supported in the latest versions (2.3, 2.4), but require watermark at least at on side - see the join matrix. If you're looking for concrete examples
StreamingJoinSuite
would be the place to go. In 2.2 however, joins between streaming datasets are not supported;– user6910411
Nov 23 '18 at 20:34
In general stream-to-stream joins are supported in the latest versions (2.3, 2.4), but require watermark at least at on side - see the join matrix. If you're looking for concrete examples
StreamingJoinSuite
would be the place to go. In 2.2 however, joins between streaming datasets are not supported;– user6910411
Nov 23 '18 at 20:34
@user6910411: Thanks. But what is the workaround in Spark 2.2? Just nothing to do?
– Mozimaki
Nov 24 '18 at 10:43
@user6910411: Thanks. But what is the workaround in Spark 2.2? Just nothing to do?
– Mozimaki
Nov 24 '18 at 10:43
Other than upgrade? Not really.
– user6910411
Nov 24 '18 at 21:29
Other than upgrade? Not really.
– user6910411
Nov 24 '18 at 21:29
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53452422%2fhow-to-join-two-spark-structured-streams%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53452422%2fhow-to-join-two-spark-structured-streams%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
In general stream-to-stream joins are supported in the latest versions (2.3, 2.4), but require watermark at least at on side - see the join matrix. If you're looking for concrete examples
StreamingJoinSuite
would be the place to go. In 2.2 however, joins between streaming datasets are not supported;– user6910411
Nov 23 '18 at 20:34
@user6910411: Thanks. But what is the workaround in Spark 2.2? Just nothing to do?
– Mozimaki
Nov 24 '18 at 10:43
Other than upgrade? Not really.
– user6910411
Nov 24 '18 at 21:29