PySpark sum of last values by ID over timeseries window
I have this DataFrame in PySpark:
[Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662)]
I need to sum only last known value for each id in second window.
Outpt should look like this:
[Row(time=datetime.datetime(2018, 7, 23, 14, 4, 22), sum=176213),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 23), sum=176112),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 24), sum=175933),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 25), sum=175543),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 26), sum=175219),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 27), sum=175002),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 28), sum=174892)...]
I have tried this:
w = Window.partitionBy(F.col('id')).orderBy('timestamp')
_df.withColumn('last_known', F.last('value').over(w)).sort('time').take(1000)
and it produces new column with last known value for each id, but I don`t know how to sum it.
[Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662, last_known=35187)]
Other solution is:
_df.orderBy('time').groupBy('timestamp', 'id').agg(F.last('value').alias('last'))
.groupBy('timestamp').agg(F.sum('last').alias('sum'))
.sort('timestamp').take(50)
Output looks promising, but double aggregate seems like asking for troubles... and it should run on TB of data, so speed is concern too.
[Row(timestamp=1532354662, sum=176142),
Row(timestamp=1532354663, sum=176142),
Row(timestamp=1532354664, sum=176139),
Row(timestamp=1532354665, sum=176137),
Row(timestamp=1532354666, sum=176133),
Row(timestamp=1532354667, sum=176128),
Row(timestamp=1532354668, sum=176125),
Row(timestamp=1532354669, sum=176122),
Row(timestamp=1532354670, sum=176120),
Row(timestamp=1532354671, sum=176118),
Row(timestamp=1532354672, sum=176117),
Row(timestamp=1532354673, sum=176114),
Any help would be much appreciated!
EDIT
Terry`s answer is best right now. If someone has better idea please post.
python apache-spark pyspark apache-spark-sql pyspark-sql
add a comment |
I have this DataFrame in PySpark:
[Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662)]
I need to sum only last known value for each id in second window.
Outpt should look like this:
[Row(time=datetime.datetime(2018, 7, 23, 14, 4, 22), sum=176213),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 23), sum=176112),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 24), sum=175933),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 25), sum=175543),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 26), sum=175219),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 27), sum=175002),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 28), sum=174892)...]
I have tried this:
w = Window.partitionBy(F.col('id')).orderBy('timestamp')
_df.withColumn('last_known', F.last('value').over(w)).sort('time').take(1000)
and it produces new column with last known value for each id, but I don`t know how to sum it.
[Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662, last_known=35187)]
Other solution is:
_df.orderBy('time').groupBy('timestamp', 'id').agg(F.last('value').alias('last'))
.groupBy('timestamp').agg(F.sum('last').alias('sum'))
.sort('timestamp').take(50)
Output looks promising, but double aggregate seems like asking for troubles... and it should run on TB of data, so speed is concern too.
[Row(timestamp=1532354662, sum=176142),
Row(timestamp=1532354663, sum=176142),
Row(timestamp=1532354664, sum=176139),
Row(timestamp=1532354665, sum=176137),
Row(timestamp=1532354666, sum=176133),
Row(timestamp=1532354667, sum=176128),
Row(timestamp=1532354668, sum=176125),
Row(timestamp=1532354669, sum=176122),
Row(timestamp=1532354670, sum=176120),
Row(timestamp=1532354671, sum=176118),
Row(timestamp=1532354672, sum=176117),
Row(timestamp=1532354673, sum=176114),
Any help would be much appreciated!
EDIT
Terry`s answer is best right now. If someone has better idea please post.
python apache-spark pyspark apache-spark-sql pyspark-sql
add a comment |
I have this DataFrame in PySpark:
[Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662)]
I need to sum only last known value for each id in second window.
Outpt should look like this:
[Row(time=datetime.datetime(2018, 7, 23, 14, 4, 22), sum=176213),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 23), sum=176112),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 24), sum=175933),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 25), sum=175543),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 26), sum=175219),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 27), sum=175002),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 28), sum=174892)...]
I have tried this:
w = Window.partitionBy(F.col('id')).orderBy('timestamp')
_df.withColumn('last_known', F.last('value').over(w)).sort('time').take(1000)
and it produces new column with last known value for each id, but I don`t know how to sum it.
[Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662, last_known=35187)]
Other solution is:
_df.orderBy('time').groupBy('timestamp', 'id').agg(F.last('value').alias('last'))
.groupBy('timestamp').agg(F.sum('last').alias('sum'))
.sort('timestamp').take(50)
Output looks promising, but double aggregate seems like asking for troubles... and it should run on TB of data, so speed is concern too.
[Row(timestamp=1532354662, sum=176142),
Row(timestamp=1532354663, sum=176142),
Row(timestamp=1532354664, sum=176139),
Row(timestamp=1532354665, sum=176137),
Row(timestamp=1532354666, sum=176133),
Row(timestamp=1532354667, sum=176128),
Row(timestamp=1532354668, sum=176125),
Row(timestamp=1532354669, sum=176122),
Row(timestamp=1532354670, sum=176120),
Row(timestamp=1532354671, sum=176118),
Row(timestamp=1532354672, sum=176117),
Row(timestamp=1532354673, sum=176114),
Any help would be much appreciated!
EDIT
Terry`s answer is best right now. If someone has better idea please post.
python apache-spark pyspark apache-spark-sql pyspark-sql
I have this DataFrame in PySpark:
[Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662)]
I need to sum only last known value for each id in second window.
Outpt should look like this:
[Row(time=datetime.datetime(2018, 7, 23, 14, 4, 22), sum=176213),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 23), sum=176112),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 24), sum=175933),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 25), sum=175543),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 26), sum=175219),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 27), sum=175002),
Row(time=datetime.datetime(2018, 7, 23, 14, 4, 28), sum=174892)...]
I have tried this:
w = Window.partitionBy(F.col('id')).orderBy('timestamp')
_df.withColumn('last_known', F.last('value').over(w)).sort('time').take(1000)
and it produces new column with last known value for each id, but I don`t know how to sum it.
[Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 6095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 15215), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 25456), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 35641), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 44516), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 106098), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 108248), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 118453), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 129638), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 138515), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 206095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 215213), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 225445), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 234635), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 244514), timestamp=1532354662, last_known=35187),
Row(id='487', value=35185, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 306095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 309226), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 319454), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 329651), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 337523), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 406077), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 415209), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 425481), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 435638), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 445548), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 506073), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 508245), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 519452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 529641), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 537512), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 606087), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 615193), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 625452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35276, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 635632), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 645538), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 706073), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 709212), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 718452), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 729642), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 738524), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 806095), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 815210), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 825455), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 834640), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 844520), timestamp=1532354662, last_known=35187),
Row(id='487', value=35184, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 906083), timestamp=1532354662, last_known=35184),
Row(id='489', value=35285, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 908243), timestamp=1532354662, last_known=35285),
Row(id='48B', value=35211, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 918445), timestamp=1532354662, last_known=35211),
Row(id='48D', value=35275, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 929632), timestamp=1532354662, last_known=35275),
Row(id='48F', value=35187, time=datetime.datetime(2018, 7, 23, 14, 4, 22, 938511), timestamp=1532354662, last_known=35187)]
Other solution is:
_df.orderBy('time').groupBy('timestamp', 'id').agg(F.last('value').alias('last'))
.groupBy('timestamp').agg(F.sum('last').alias('sum'))
.sort('timestamp').take(50)
Output looks promising, but double aggregate seems like asking for troubles... and it should run on TB of data, so speed is concern too.
[Row(timestamp=1532354662, sum=176142),
Row(timestamp=1532354663, sum=176142),
Row(timestamp=1532354664, sum=176139),
Row(timestamp=1532354665, sum=176137),
Row(timestamp=1532354666, sum=176133),
Row(timestamp=1532354667, sum=176128),
Row(timestamp=1532354668, sum=176125),
Row(timestamp=1532354669, sum=176122),
Row(timestamp=1532354670, sum=176120),
Row(timestamp=1532354671, sum=176118),
Row(timestamp=1532354672, sum=176117),
Row(timestamp=1532354673, sum=176114),
Any help would be much appreciated!
EDIT
Terry`s answer is best right now. If someone has better idea please post.
python apache-spark pyspark apache-spark-sql pyspark-sql
python apache-spark pyspark apache-spark-sql pyspark-sql
edited Nov 15 '18 at 22:22
Doman
asked Nov 15 '18 at 14:42
DomanDoman
4815
4815
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
I believe you can replace your first groupBy for a 'dropDuplicates' and set ascending = False in orderBy. Like this:
df.orderBy(['time'], ascending = False)
.dropDuplicates(['timestamp', 'id'])
.groupBy('timestamp').agg(F.sum('value').alias('sum')).sort('timestamp').show()
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53321915%2fpyspark-sum-of-last-values-by-id-over-timeseries-window%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
I believe you can replace your first groupBy for a 'dropDuplicates' and set ascending = False in orderBy. Like this:
df.orderBy(['time'], ascending = False)
.dropDuplicates(['timestamp', 'id'])
.groupBy('timestamp').agg(F.sum('value').alias('sum')).sort('timestamp').show()
add a comment |
I believe you can replace your first groupBy for a 'dropDuplicates' and set ascending = False in orderBy. Like this:
df.orderBy(['time'], ascending = False)
.dropDuplicates(['timestamp', 'id'])
.groupBy('timestamp').agg(F.sum('value').alias('sum')).sort('timestamp').show()
add a comment |
I believe you can replace your first groupBy for a 'dropDuplicates' and set ascending = False in orderBy. Like this:
df.orderBy(['time'], ascending = False)
.dropDuplicates(['timestamp', 'id'])
.groupBy('timestamp').agg(F.sum('value').alias('sum')).sort('timestamp').show()
I believe you can replace your first groupBy for a 'dropDuplicates' and set ascending = False in orderBy. Like this:
df.orderBy(['time'], ascending = False)
.dropDuplicates(['timestamp', 'id'])
.groupBy('timestamp').agg(F.sum('value').alias('sum')).sort('timestamp').show()
answered Nov 15 '18 at 16:14
TerryTerry
427412
427412
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53321915%2fpyspark-sum-of-last-values-by-id-over-timeseries-window%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown