Getting started with PySpark & GeoPandas on Databricks
Over the last years, many data analysis platforms have added spatial support to their portfolio. Just two days ago, Databricks have published an extensive post on spatial analysis. I took their post as a sign that it is time to look into how PySpark and GeoPandas can work together to achieve scalable spatial analysis workflows.
If you sign up for Databricks Community Edition, you get access to a toy cluster for experimenting with (Py)Spark. This considerably lowers the entry barrier to Spark since you don’t need to bother with installing anything yourself. They also provide a notebook environment:
I’ve followed the official Databricks GeoPandas example notebook but expanded it to read from a real geodata format (GeoPackage) rather than from CSV.
I’m using test data from the MovingPandas repository: demodata_geolife.gpkg contains a hand full of trajectories from the Geolife dataset. Demodata_grid.gpkg contains a simple 3×4 grid that covers the same geographic extent as the geolife sample:
Once the files are downloaded, we can use GeoPandas to read the GeoPackages:
Note that the display() function is used to show the plot.
The same applies to the grid data:
When the GeoDataFrames are ready, we can start using them in PySpark. To do so, it is necessary to convert from GeoDataFrame to PySpark DataFrame. Therefore, I’ve implemented a simple function that performs the conversion and turn the Point geometries into lon and lat columns:
To compute new values for our DataFrame, we can use existing or user-defined functions (UDF). Here’s a simple hello world function and associated UDF:
A spatial UDF is a little more involved. For example, here’s an UDF that finds the first polygon that intersects the specified lat/lon and returns that polygon’s ID. Note how we first broadcast the grid DataFrame to ensure that it is available on all computation nodes:
It’s worth noting that PySpark has its peculiarities. Since it’s a Python wrapper of a strongly typed language, we need to pay close attention to types in our Python code. For example, when defining UDFs, if the specified return type (Integertype in the above example) does not match the actual value returned by the find_intersection() function, this will cause rather cryptic errors.
To plot the results, I’m converting the joined PySpark DataFrame back to GeoDataFrame:
I’ve published this notebook so you can give it a try. (Any notebook published on Databricks is supposed to stay online for six months, so if you’re trying to access it after June 2020, this link may be broken.)
Very nice work!!! It could be very interesting to extent this article and explain the data process flow using the explain() function and how you can succeed better performance.
Thank you Christos! The spatial UDF is largely based on the Databricks blog post.
Applying the find_intersection_udf() generates a very short plan that looks pretty much like the hello_world_udf() plan but if you spot potential for improvement, please share!
== Physical Plan ==
*(2) Project [id#16L, sequence#17L, trajectory_id#18L, tracker#19L, t#20, lon#21, lat#22, pythonUDF0#79 AS cell_id#70]
+- BatchEvalPython [find_intersection(lon#21, lat#22)], [pythonUDF0#79]
+- *(1) Scan ExistingRDD arrow[id#16L,sequence#17L,trajectory_id#18L,tracker#19L,t#20,lon#21,lat#22]