Spark SQL window function with complex condition











up vote
15
down vote

favorite
9












This is probably easiest to explain through example. Suppose I have a DataFrame of user logins to a website, for instance:



scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows


I would like to add to this a column indicating when they became an active user on the site. But there is one caveat: there is a time period during which a user is considered active, and after this period, if they log in again, their became_active date resets. Suppose this period is 5 days. Then the desired table derived from the above table would be something like this:



+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+


So, in particular, SirChillingtonIV's became_active date was reset because their second login came after the active period expired, but Booooooo99900098's became_active date was not reset the second time he/she logged in, because it fell within the active period.



My initial thought was to use window functions with lag, and then using the lagged values to fill the became_active column; for instance, something starting roughly like:



import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))


Then, the rule to fill in the became_active date would be, if tmp is null (i.e., if it's the first ever login) or if login_date - tmp >= 5 then became_active = login_date; otherwise, go to the next most recent value in tmp and apply the same rule. This suggests a recursive approach, which I'm having trouble imagining a way to implement.



My questions: Is this a viable approach, and if so, how can I "go back" and look at earlier values of tmp until I find one where I stop? I can't, to my knowledge, iterate through values of a Spark SQL Column. Is there another way to achieve this result?










share|improve this question




























    up vote
    15
    down vote

    favorite
    9












    This is probably easiest to explain through example. Suppose I have a DataFrame of user logins to a website, for instance:



    scala> df.show(5)
    +----------------+----------+
    | user_name|login_date|
    +----------------+----------+
    |SirChillingtonIV|2012-01-04|
    |Booooooo99900098|2012-01-04|
    |Booooooo99900098|2012-01-06|
    | OprahWinfreyJr|2012-01-10|
    |SirChillingtonIV|2012-01-11|
    +----------------+----------+
    only showing top 5 rows


    I would like to add to this a column indicating when they became an active user on the site. But there is one caveat: there is a time period during which a user is considered active, and after this period, if they log in again, their became_active date resets. Suppose this period is 5 days. Then the desired table derived from the above table would be something like this:



    +----------------+----------+-------------+
    | user_name|login_date|became_active|
    +----------------+----------+-------------+
    |SirChillingtonIV|2012-01-04| 2012-01-04|
    |Booooooo99900098|2012-01-04| 2012-01-04|
    |Booooooo99900098|2012-01-06| 2012-01-04|
    | OprahWinfreyJr|2012-01-10| 2012-01-10|
    |SirChillingtonIV|2012-01-11| 2012-01-11|
    +----------------+----------+-------------+


    So, in particular, SirChillingtonIV's became_active date was reset because their second login came after the active period expired, but Booooooo99900098's became_active date was not reset the second time he/she logged in, because it fell within the active period.



    My initial thought was to use window functions with lag, and then using the lagged values to fill the became_active column; for instance, something starting roughly like:



    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._

    val window = Window.partitionBy("user_name").orderBy("login_date")
    val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))


    Then, the rule to fill in the became_active date would be, if tmp is null (i.e., if it's the first ever login) or if login_date - tmp >= 5 then became_active = login_date; otherwise, go to the next most recent value in tmp and apply the same rule. This suggests a recursive approach, which I'm having trouble imagining a way to implement.



    My questions: Is this a viable approach, and if so, how can I "go back" and look at earlier values of tmp until I find one where I stop? I can't, to my knowledge, iterate through values of a Spark SQL Column. Is there another way to achieve this result?










    share|improve this question


























      up vote
      15
      down vote

      favorite
      9









      up vote
      15
      down vote

      favorite
      9






      9





      This is probably easiest to explain through example. Suppose I have a DataFrame of user logins to a website, for instance:



      scala> df.show(5)
      +----------------+----------+
      | user_name|login_date|
      +----------------+----------+
      |SirChillingtonIV|2012-01-04|
      |Booooooo99900098|2012-01-04|
      |Booooooo99900098|2012-01-06|
      | OprahWinfreyJr|2012-01-10|
      |SirChillingtonIV|2012-01-11|
      +----------------+----------+
      only showing top 5 rows


      I would like to add to this a column indicating when they became an active user on the site. But there is one caveat: there is a time period during which a user is considered active, and after this period, if they log in again, their became_active date resets. Suppose this period is 5 days. Then the desired table derived from the above table would be something like this:



      +----------------+----------+-------------+
      | user_name|login_date|became_active|
      +----------------+----------+-------------+
      |SirChillingtonIV|2012-01-04| 2012-01-04|
      |Booooooo99900098|2012-01-04| 2012-01-04|
      |Booooooo99900098|2012-01-06| 2012-01-04|
      | OprahWinfreyJr|2012-01-10| 2012-01-10|
      |SirChillingtonIV|2012-01-11| 2012-01-11|
      +----------------+----------+-------------+


      So, in particular, SirChillingtonIV's became_active date was reset because their second login came after the active period expired, but Booooooo99900098's became_active date was not reset the second time he/she logged in, because it fell within the active period.



      My initial thought was to use window functions with lag, and then using the lagged values to fill the became_active column; for instance, something starting roughly like:



      import org.apache.spark.sql.expressions.Window
      import org.apache.spark.sql.functions._

      val window = Window.partitionBy("user_name").orderBy("login_date")
      val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))


      Then, the rule to fill in the became_active date would be, if tmp is null (i.e., if it's the first ever login) or if login_date - tmp >= 5 then became_active = login_date; otherwise, go to the next most recent value in tmp and apply the same rule. This suggests a recursive approach, which I'm having trouble imagining a way to implement.



      My questions: Is this a viable approach, and if so, how can I "go back" and look at earlier values of tmp until I find one where I stop? I can't, to my knowledge, iterate through values of a Spark SQL Column. Is there another way to achieve this result?










      share|improve this question















      This is probably easiest to explain through example. Suppose I have a DataFrame of user logins to a website, for instance:



      scala> df.show(5)
      +----------------+----------+
      | user_name|login_date|
      +----------------+----------+
      |SirChillingtonIV|2012-01-04|
      |Booooooo99900098|2012-01-04|
      |Booooooo99900098|2012-01-06|
      | OprahWinfreyJr|2012-01-10|
      |SirChillingtonIV|2012-01-11|
      +----------------+----------+
      only showing top 5 rows


      I would like to add to this a column indicating when they became an active user on the site. But there is one caveat: there is a time period during which a user is considered active, and after this period, if they log in again, their became_active date resets. Suppose this period is 5 days. Then the desired table derived from the above table would be something like this:



      +----------------+----------+-------------+
      | user_name|login_date|became_active|
      +----------------+----------+-------------+
      |SirChillingtonIV|2012-01-04| 2012-01-04|
      |Booooooo99900098|2012-01-04| 2012-01-04|
      |Booooooo99900098|2012-01-06| 2012-01-04|
      | OprahWinfreyJr|2012-01-10| 2012-01-10|
      |SirChillingtonIV|2012-01-11| 2012-01-11|
      +----------------+----------+-------------+


      So, in particular, SirChillingtonIV's became_active date was reset because their second login came after the active period expired, but Booooooo99900098's became_active date was not reset the second time he/she logged in, because it fell within the active period.



      My initial thought was to use window functions with lag, and then using the lagged values to fill the became_active column; for instance, something starting roughly like:



      import org.apache.spark.sql.expressions.Window
      import org.apache.spark.sql.functions._

      val window = Window.partitionBy("user_name").orderBy("login_date")
      val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))


      Then, the rule to fill in the became_active date would be, if tmp is null (i.e., if it's the first ever login) or if login_date - tmp >= 5 then became_active = login_date; otherwise, go to the next most recent value in tmp and apply the same rule. This suggests a recursive approach, which I'm having trouble imagining a way to implement.



      My questions: Is this a viable approach, and if so, how can I "go back" and look at earlier values of tmp until I find one where I stop? I can't, to my knowledge, iterate through values of a Spark SQL Column. Is there another way to achieve this result?







      sql apache-spark apache-spark-sql window-functions






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Sep 28 at 11:03









      Community

      11




      11










      asked Feb 24 '17 at 21:25









      user4601931

      2,00111323




      2,00111323
























          1 Answer
          1






          active

          oldest

          votes

















          up vote
          25
          down vote



          accepted










          Here is the trick. Import a bunch of functions:



          import org.apache.spark.sql.expressions.Window
          import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}


          Define windows:



          val userWindow = Window.partitionBy("user_name").orderBy("login_date")
          val userSessionWindow = Window.partitionBy("user_name", "session")


          Find the points where new sessions starts:



          val newSession =  (coalesce(
          datediff($"login_date", lag($"login_date", 1).over(userWindow)),
          lit(0)
          ) > 5).cast("bigint")

          val sessionized = df.withColumn("session", sum(newSession).over(userWindow))


          Find the earliest date per session:



          val result = sessionized
          .withColumn("became_active", min($"login_date").over(userSessionWindow))
          .drop("session")


          With dataset defined as:



          val df = Seq(
          ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
          ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
          ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
          ("SirChillingtonIV", "2012-08-11")
          ).toDF("user_name", "login_date")


          The result is:



          +----------------+----------+-------------+
          | user_name|login_date|became_active|
          +----------------+----------+-------------+
          | OprahWinfreyJr|2012-01-10| 2012-01-10|
          |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
          |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
          |SirChillingtonIV|2012-01-14| 2012-01-11|
          |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
          |Booooooo99900098|2012-01-04| 2012-01-04|
          |Booooooo99900098|2012-01-06| 2012-01-04|
          +----------------+----------+-------------+





          share|improve this answer





















          • I know it has been a long time, but can you help me understand the coalesce part of the solution??
            – Sanchit Grover
            Apr 15 at 8:33






          • 1




            @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.
            – user6910411
            Apr 15 at 10:19










          • Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?
            – Sanchit Grover
            Apr 15 at 12:02










          • It is a cumulative sum of values in set {0, 1}.
            – user6910411
            Apr 15 at 12:04










          • I would double vote this answer if I could, thx!
            – Madhava Carrillo
            Nov 22 at 10:25











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f42448564%2fspark-sql-window-function-with-complex-condition%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          25
          down vote



          accepted










          Here is the trick. Import a bunch of functions:



          import org.apache.spark.sql.expressions.Window
          import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}


          Define windows:



          val userWindow = Window.partitionBy("user_name").orderBy("login_date")
          val userSessionWindow = Window.partitionBy("user_name", "session")


          Find the points where new sessions starts:



          val newSession =  (coalesce(
          datediff($"login_date", lag($"login_date", 1).over(userWindow)),
          lit(0)
          ) > 5).cast("bigint")

          val sessionized = df.withColumn("session", sum(newSession).over(userWindow))


          Find the earliest date per session:



          val result = sessionized
          .withColumn("became_active", min($"login_date").over(userSessionWindow))
          .drop("session")


          With dataset defined as:



          val df = Seq(
          ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
          ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
          ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
          ("SirChillingtonIV", "2012-08-11")
          ).toDF("user_name", "login_date")


          The result is:



          +----------------+----------+-------------+
          | user_name|login_date|became_active|
          +----------------+----------+-------------+
          | OprahWinfreyJr|2012-01-10| 2012-01-10|
          |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
          |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
          |SirChillingtonIV|2012-01-14| 2012-01-11|
          |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
          |Booooooo99900098|2012-01-04| 2012-01-04|
          |Booooooo99900098|2012-01-06| 2012-01-04|
          +----------------+----------+-------------+





          share|improve this answer





















          • I know it has been a long time, but can you help me understand the coalesce part of the solution??
            – Sanchit Grover
            Apr 15 at 8:33






          • 1




            @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.
            – user6910411
            Apr 15 at 10:19










          • Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?
            – Sanchit Grover
            Apr 15 at 12:02










          • It is a cumulative sum of values in set {0, 1}.
            – user6910411
            Apr 15 at 12:04










          • I would double vote this answer if I could, thx!
            – Madhava Carrillo
            Nov 22 at 10:25















          up vote
          25
          down vote



          accepted










          Here is the trick. Import a bunch of functions:



          import org.apache.spark.sql.expressions.Window
          import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}


          Define windows:



          val userWindow = Window.partitionBy("user_name").orderBy("login_date")
          val userSessionWindow = Window.partitionBy("user_name", "session")


          Find the points where new sessions starts:



          val newSession =  (coalesce(
          datediff($"login_date", lag($"login_date", 1).over(userWindow)),
          lit(0)
          ) > 5).cast("bigint")

          val sessionized = df.withColumn("session", sum(newSession).over(userWindow))


          Find the earliest date per session:



          val result = sessionized
          .withColumn("became_active", min($"login_date").over(userSessionWindow))
          .drop("session")


          With dataset defined as:



          val df = Seq(
          ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
          ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
          ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
          ("SirChillingtonIV", "2012-08-11")
          ).toDF("user_name", "login_date")


          The result is:



          +----------------+----------+-------------+
          | user_name|login_date|became_active|
          +----------------+----------+-------------+
          | OprahWinfreyJr|2012-01-10| 2012-01-10|
          |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
          |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
          |SirChillingtonIV|2012-01-14| 2012-01-11|
          |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
          |Booooooo99900098|2012-01-04| 2012-01-04|
          |Booooooo99900098|2012-01-06| 2012-01-04|
          +----------------+----------+-------------+





          share|improve this answer





















          • I know it has been a long time, but can you help me understand the coalesce part of the solution??
            – Sanchit Grover
            Apr 15 at 8:33






          • 1




            @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.
            – user6910411
            Apr 15 at 10:19










          • Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?
            – Sanchit Grover
            Apr 15 at 12:02










          • It is a cumulative sum of values in set {0, 1}.
            – user6910411
            Apr 15 at 12:04










          • I would double vote this answer if I could, thx!
            – Madhava Carrillo
            Nov 22 at 10:25













          up vote
          25
          down vote



          accepted







          up vote
          25
          down vote



          accepted






          Here is the trick. Import a bunch of functions:



          import org.apache.spark.sql.expressions.Window
          import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}


          Define windows:



          val userWindow = Window.partitionBy("user_name").orderBy("login_date")
          val userSessionWindow = Window.partitionBy("user_name", "session")


          Find the points where new sessions starts:



          val newSession =  (coalesce(
          datediff($"login_date", lag($"login_date", 1).over(userWindow)),
          lit(0)
          ) > 5).cast("bigint")

          val sessionized = df.withColumn("session", sum(newSession).over(userWindow))


          Find the earliest date per session:



          val result = sessionized
          .withColumn("became_active", min($"login_date").over(userSessionWindow))
          .drop("session")


          With dataset defined as:



          val df = Seq(
          ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
          ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
          ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
          ("SirChillingtonIV", "2012-08-11")
          ).toDF("user_name", "login_date")


          The result is:



          +----------------+----------+-------------+
          | user_name|login_date|became_active|
          +----------------+----------+-------------+
          | OprahWinfreyJr|2012-01-10| 2012-01-10|
          |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
          |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
          |SirChillingtonIV|2012-01-14| 2012-01-11|
          |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
          |Booooooo99900098|2012-01-04| 2012-01-04|
          |Booooooo99900098|2012-01-06| 2012-01-04|
          +----------------+----------+-------------+





          share|improve this answer












          Here is the trick. Import a bunch of functions:



          import org.apache.spark.sql.expressions.Window
          import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}


          Define windows:



          val userWindow = Window.partitionBy("user_name").orderBy("login_date")
          val userSessionWindow = Window.partitionBy("user_name", "session")


          Find the points where new sessions starts:



          val newSession =  (coalesce(
          datediff($"login_date", lag($"login_date", 1).over(userWindow)),
          lit(0)
          ) > 5).cast("bigint")

          val sessionized = df.withColumn("session", sum(newSession).over(userWindow))


          Find the earliest date per session:



          val result = sessionized
          .withColumn("became_active", min($"login_date").over(userSessionWindow))
          .drop("session")


          With dataset defined as:



          val df = Seq(
          ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
          ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"),
          ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
          ("SirChillingtonIV", "2012-08-11")
          ).toDF("user_name", "login_date")


          The result is:



          +----------------+----------+-------------+
          | user_name|login_date|became_active|
          +----------------+----------+-------------+
          | OprahWinfreyJr|2012-01-10| 2012-01-10|
          |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user
          |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user
          |SirChillingtonIV|2012-01-14| 2012-01-11|
          |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user
          |Booooooo99900098|2012-01-04| 2012-01-04|
          |Booooooo99900098|2012-01-06| 2012-01-04|
          +----------------+----------+-------------+






          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Feb 24 '17 at 22:51









          user6910411

          32.1k86692




          32.1k86692












          • I know it has been a long time, but can you help me understand the coalesce part of the solution??
            – Sanchit Grover
            Apr 15 at 8:33






          • 1




            @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.
            – user6910411
            Apr 15 at 10:19










          • Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?
            – Sanchit Grover
            Apr 15 at 12:02










          • It is a cumulative sum of values in set {0, 1}.
            – user6910411
            Apr 15 at 12:04










          • I would double vote this answer if I could, thx!
            – Madhava Carrillo
            Nov 22 at 10:25


















          • I know it has been a long time, but can you help me understand the coalesce part of the solution??
            – Sanchit Grover
            Apr 15 at 8:33






          • 1




            @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.
            – user6910411
            Apr 15 at 10:19










          • Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?
            – Sanchit Grover
            Apr 15 at 12:02










          • It is a cumulative sum of values in set {0, 1}.
            – user6910411
            Apr 15 at 12:04










          • I would double vote this answer if I could, thx!
            – Madhava Carrillo
            Nov 22 at 10:25
















          I know it has been a long time, but can you help me understand the coalesce part of the solution??
          – Sanchit Grover
          Apr 15 at 8:33




          I know it has been a long time, but can you help me understand the coalesce part of the solution??
          – Sanchit Grover
          Apr 15 at 8:33




          1




          1




          @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.
          – user6910411
          Apr 15 at 10:19




          @SanchitGrover If datediff($"login_date", lag($"login_date", 1).over(userWindow)) evaluates to null (first row in the frame) get 0.
          – user6910411
          Apr 15 at 10:19












          Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?
          – Sanchit Grover
          Apr 15 at 12:02




          Then how this val sessionized = df.withColumn("session", sum(newSession).over(userWindow)) is increasing the count?
          – Sanchit Grover
          Apr 15 at 12:02












          It is a cumulative sum of values in set {0, 1}.
          – user6910411
          Apr 15 at 12:04




          It is a cumulative sum of values in set {0, 1}.
          – user6910411
          Apr 15 at 12:04












          I would double vote this answer if I could, thx!
          – Madhava Carrillo
          Nov 22 at 10:25




          I would double vote this answer if I could, thx!
          – Madhava Carrillo
          Nov 22 at 10:25


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f42448564%2fspark-sql-window-function-with-complex-condition%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Wiesbaden

          Marschland

          Dieringhausen