I recently employed a little trick to make Docker data-only images which can be joined into other Docker images as needed.
Imagine you have a set of configuration that you want to store somewhere separately to the code. Well, you can make a Docker image of just this configuration very easily:
FROM scratch
ADD ./ /fake-config
After building it with docker build -t test/fake-config .
, the only thing inside this image will be the directory you added. This is the most barebones image it is possible to make outside of scratch
itself .
Now, you may wonder what the use of an image like this is — after all, it can’t be executed or run because it has no binaries inside. Enter the COPY --from
command:
FROM <some base image>
# ...
COPY --from=test/fake-config /fake-config /fake-config
# ...
With one command, the contents of your data-only image has been added to another image. This is super useful for adding configuration that you want shared between applications to the Docker images for them .
Occasionally within a Spark application, you may encounter issues because some member of a (case
) class
/ object
is not serialisable. This manifests most often as an exception when the first task containing that item is attempted to be sent to an executor. The vast majority of the time, the fix you probably will reach for is to make the object implement Serializable
. Sometimes however, this may not be easy or even possible (for example, when the type in question is out of your control).
It turns out that there is another way! If the object in question can be constructed again inexpensively, the @transient lazy val
pattern may be for you:
case class Foo(foo: SomeType) {
@transient lazy val bar: SomeOtherType = SomeOtherType(foo)
}
The @transient
annotation has the effect of excluding the annotated item from the object it is contained within when that object is serialised. In conjunction with the lazy val
, this means the field will be constructed again when first accessed on each of the executors, rather than being sent to each executor as a series of serialised bytes to deserialise as part of the task.
Sometimes this trick can actually result in modest performance improvements — for example, if the object is question is large when serialised but is cheap to construct again (like large collections computed from some smaller seed). However, carefully note that it is constructed again once per executor making it only useful for stateless items.
The next time you hit a serialisation issue in Spark, give @transient lazy val
a try and see if it’s a fit for you!
When working with Spark it’s tempting to use Broadcast
variables everywhere you need them without thinking too much about it. However, if you are not careful, this can lead to strong coupling between all of your application components and the implementation of Spark itself.
How Broadcast variables work
Broadcast variables are intended to solve a very specific problem in Spark: namely, preparing some static value on the driver side to be passed efficiently to all nodes that need it during some processing step. Spark takes care of implementing the distribution of this variable in the most efficient manner, eg. it can elect to use BitTorrent for larger variables (if enabled), and promises that only the nodes that need the data will be sent it at the point they need it.
However, in order to work, broadcast variables effectively provide a wrapper type for the variable they close over:
// This is a vastly simplified version of the class!
abstract class Broadcast[T: ClassTag](val id: Long) {
// ...
def value: T = // ...
// ...
}
Effectively, Spark will store our data and reference it via an ID (a Long
) which is all we need to send to the nodes until they actually need the data. When they need the data, they send the driver the id, and they get what they need back!
Working with Broadcast variables
In order for us to setup and get the value out of a broadcast variable, we need to do something like the following:
val data: Broadcast[Array[Int]] = sparkContext.broadcast(Array(1, 2, 3))
// Later on, to use it somewhere...
data.value // => Array[Int] = Array(1, 2, 3))
Leaking the types
When working with Spark, it can be easy to end up with one “god job” where the implementation of all the steps of a job are inlined and heavily dependent on Spark features.
With some effort, we can factor out our pure functions into smaller classes which we can call into:
class PostcodeToLatLong(lookup: Map[String, (Double, Double)]) {
def lookupPostcode(postcode: String): (Double, Double) = ???
}
Here we have a simple class which needs a lookup table to function: in this case, it will convert some string based postcode to a latitude/longitude pair which is represented as a tuple of Doubles
.
If this class was going to be used in a Spark job, we may consider making the postcodeTable
a broadcast variable so that only the nodes that need it will request the data for it. However, we hit a snag: we can’t do this without leaking the Brodcast
wrapper type into the implementation details of the class like so:
class PostcodeToLatLong(lookup: Broadcast[Map[String, (Double, Double)]]) {
// ...
}
This is a nightmare for testing, as now we’ve gone from having a simple, pure class which can be unit tested easily to having a class that depends entirely on Spark implementation details and needs a SparkContext
just to setup!
Abstracting away the broadcast functionality
Thankfully, we can solve this using a trait that abstracts what we actually care about: that the thing we have passed is lazy and will only be grabbed when we call .value
on it!
trait Lazy[T] extends Serializable {
def value: T
}
And now we can change our class to take this trait instead:
class PostcodeToLatLong(lookup: Lazy[Map[String, (Double, Double)]]) {
// ...
}
With some simple implicit classes we can make it easy to call this class with either a Spark broadcast variable or any old primitive object:
object Lazy {
object implicits {
implicit class LazySparkBroadcast[T: ClassTag](bc: Broadcast[T]) extends Lazy[T] {
override def value: T = bc.value
}
implicit class LazyPrimitive[T: ClassTag](inner: T) extends Lazy[T] {
override def value: T = inner
}
}
}
And now we can put the pieces together:
import Lazy.implicits._
val lookup = Map("foo" -> (123.45, 123.45))
val bcLookup = sparkContext.broadcast(lookup)
// And later on...
val postcodeMapper = new PostcodeToLatLong(bcLookup)
// This also works!
val postcodeMapper = new PostcodeToLatLong(lookup)