Distributed computation¶
The pipeline computation in Haydi can be transparently executed through dask/distributed. This enables distributed computation or local parallel computation.
Switching pipeline from default serial context to distributed one, is done
through providing a class to run
method in the pipeline.
Local computation¶
The following example shows how to start four local dask/distributed workers and run Haydi computation on them:
>>> from haydi import DistributedContext
>>> dctx = DistributedContext(spawn_workers=4)
>>> hd.Range(100).map(lambda x: x + 1).collect().run(ctx=dctx)
[1, 2, 3, 4, ...]
The argument spawn_workers
forces DistributedContext
to spawn
dask/distributed workers for you. Switching pipeline from default serial context
to distributed one, is done through providing a distributed context to run
method in the pipeline.
Distributed computation¶
If you want to distribute the computation amongst multiple computers, you first have to create a distributed cluster. An example of cluster setup can be found here. Once the cluster is created, simply pass the IP adress and port of the clusters’ scheduler to the context:
>>> dctx = DistributedContext(ip='192.168.1.1', port=8787)
# connects to cluster at 192.168.1.1:8787
>>> hd.Range(100).map(lambda x: x + 1).collect().run(ctx=dctx)
[1, 2, 3, 4, ...]
Limitations¶
The nested distributed computations are not allowed, i.e. you cannot run distributed pipeline in another distributed pipeline. The following example shows the invalid case:
>>> def worker(x):
... r = hd.Range(x)
... # invalid! sequential run() must be used
... return r.collect().run(DistributedContext(spawn_workers=4))
# THIS IS INVALID - nested distributed computations are not allowed
>>> hd.Range(100).map(worker).run(DistributedContext(spawn_workers=4))