Ray

What is Ray? What problems does it solve?

Ray (website, GitHub) is an open-source system for scaling Python applications from single machines to large clusters. Its design is driven by the unique needs of next-generation ML/AI systems, which face several unique challenges, including diverse computational patterns, management of distributed, evolving state, and the desire to address all those needs with minimal programming effort.
Typical ML/AI systems require diverse computational patterns to support data cleaning and preparation, hyperparameter tuning, model training and serving, and other chores. The original MapReduce model for big data workloads works well for data cleaning, preparation, and also for analysis workloads, but machine learning workloads require a mixture of fine-grained to coarse-grained tasks, along with varied communication patterns between components. Hyperparameter tuning and model training are very compute-intensive, requiring cluster resources to complete in reasonable time frames. Ray provides the foundation for building modern ML/AI systems and applications by satisfying these diverse requirements in a performant manner, with a minimal and intuitive API.
A second challenge is distributed, evolving state. In the context of ML/AI, distributed state includes the hyperparameters, model parameters (e.g., and for reinforcement learning scenarios, the state of simulations (or interactions with the real world) used for training. Often the state is mutable, especially during training, so careful, concurrency-safe updates are required. One possible way of handling distributed computing is to exploit popular “serverless” systems, but none currently offers facilities for managing distributed, mutable state. Developers must resort to keeping all state in a database when using serveless systems, but the database can be a bottleneck and a single point of failure.
Instead, Ray uses the popular Actor model to provide an intuitive mechanism for state management. Ray  provide a stateful complement to Ray  which are stateless. This state is transparently reachable to any other Ray actor or task through a reference to the corresponding Python object (i.e., an instance of a Python class). Ray keeps track of where the actor is located in the cluster, eliminating the need to explicitly know and manage such locations in user code. Mutation of state in the actor is handled in a thread-safe way, without the need for explicit concurrency primitives. Hence, Ray provides intuitive, distributed state management for applications, which means that Ray can be an excellent platform for implementing  serverless applications, in general. Furthermore, when communicating between actors or tasks on the same machine, the state is transparently managed through shared memory, with zero-copy serialization between the actors and tasks, for optimal performance.
Finally, because most ML/AI systems are Python-based, developers need a way to add these scale-out capabilities with minimal code changes. A decorator, @ray.remote, marks functions and classes as logical units that can be instantiated and executed in a cluster. Ray transparently handles thread-safe mutation of state, distribution of state, and intuitive scheduling of dependent tasks.
The Ray distribution includes several high-performance libraries targeting AI applications, which were also motivating problems that drove the creation of Ray. They include RLlib for reinforcement learning and Tune for hyperparameter tuning. Both demonstrate Ray’s unique capabilities. These libraries and other, custom applications written with Ray are already used in many production deployments.
Ray is an open source project started in the UC Berkeley RISELab. It is now developed at Anyscale with major contributions from many other organizations. Commercial users of Ray include Ant Financial, JP Morgan, Intel, Microsoft, Ericsson, Skymind, Primer, and many others.

An Example of the Core Ray API

Now that we understand the motivations and advantages of Ray, let’s examine how you would use the Ray API in your applications. Then we’ll look more closely at how Ray improves performance through parallelization and distribution. The Ray API is carefully designed to enable users to scale their applications, even across a cluster, with minimal code changes.
Consider the example of a , which is a key-value store used for training machine learning models in a cluster. The values are the parameters of a machine-learning model (e.g., a neural network). The keys index the model parameters. If you are unfamiliar with parameter servers, think of any standalone service you might need for serving requests for information or data.
For example, in a movie recommendation system, there might be one key per user and one key per movie. For each user and movie, there are corresponding user-specific and movie-specific parameters. In a language-modeling application, words might be the keys and their embeddings may be the values.
In its simplest form, a parameter server may have a single key and allow all of the parameters to be retrieved and updated at once.
Here is an example of such a simple parameter server, for a single NumPy array of parameters. It is implemented as a Ray actor in under 15 lines of code:
import numpy as npimport ray@ray.remoteclass ParameterServer(object):  def __init__(self, dim):    # Alternatively, params could be a dictionary    # mapping keys to arrays.    self.params = np.zeros(dim)
  def get_params(self):    return self.params  def update_params(self, grad):    self.params += grad
The @ray.remote decorator defines a service. It takes the ordinary Python class, ParameterServer, and allows it to be instantiated as a remote service. Because the instance maintains state (the current parameters, which are mutable), we have a .
In this example, we assume that an update to the parameters is provided as a  that should be added to the current parameter vector. (This gradient can be a single number that is added to all array elements or an array of gradients.) More sophisticated designs are possible, of course, but Ray would be used the same way. As a simple exercise, try changing this to a key-value (dictionary) implementation.
A parameter server typically exists as a remote process or service. Clients interact with it through remote procedure calls. To instantiate the parameter server as a remote actor, we do the following steps at the interactive Python prompt. (We’ll assume you already defined the ParameterServer class in the same session). First, you have to start Ray. When using a cluster, you would pass optional parameters to the init() method to specify the cluster location:
>>> ray.init()
Next, create a ParameterServer instance for an array of 10 parameters:
>>> ps = ParameterServer.remote(10)>>> psActor(ParameterServer, 7d58f41501000000)
Instead of calling ParameterServer(10) to construct an instance, the way you would for a normal Python instance, you use the remote() method instead, which was added to the class by the @ray.remote decorator. You pass the same arguments you would pass to the regular constructor. Note that an actor object is constructed.
Similarly, to invoke methods on the actor, you use remote() appended to the original method name, passing the same arguments you would pass to the original method:
>>> params_id = ps.get_params.remote()  # This returns a future.>>> params_idObjectID(7268cb8d345ef26632430df6f18cc9690eb6b300)
Actor method invocations return Futures. To retrieve the actual values, we use the blocking ray.get(id) call:
>>> ray.get(params_id)  # This blocks until the task finishes.array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
As we expect, the initial parameter values are all zeros. What ray.get(id) actually does is pull the value out of the distributed state store service that Ray provides. The value was written to the distributed state store by the actor when it updated its state. If the value and the client are both on the same machine, the value is pulled from shared memory for fast performance. If the value and client are resident on different machines, the value is pulled over to the machine that needs it.
For completeness, your code can also write values explicitly to this storage using ray.put(id, value). When you want to retrieve several values as they become available, there is a convenient ray.wait(…) function available. See the Ray API for more details.
Following the actor model, when clients invoke these actor methods, the invocations are routed to the actor instance, wherever it may be in the cluster. Since concurrent invocations can occur, Ray insures that each invocation is processed in a thread-safe way, so the risk of corrupting the state is prevented without the need for explicit thread synchronization code. However, this doesn’t impose any sort of globally ordering of when these invocations are processed; it is first come, first serve.
Now, suppose we want to run several worker tasks that continuously compute gradients and update the model parameters. Each worker will run in a loop that does the following three things:
  1. Get the latest parameters.
  2. Compute an update to the parameters.
  3. Update the parameters.
These workers will be stateless, so we’ll use a Ray  (a remote ) instead of an actor:
import time# The worker function takes a handle to the parameter server as# an argument, which allows the worker task to invoke methods# on the parameter server actor.@ray.remotedef worker(ps):  for _ in range(100):  # Arbitrary number (100) of updates    # First, get the latest parameters. The following    # method call is non-blocking; it returns a future    # effectively immediately.    params_id = ps.get_params.remote()    # As before, this is a blocking call that waits for    # the task to finish and then gets the value.    params = ray.get(params_id)    # Compute a gradient update. We make a fake update,    # but in practice this would use an ML library like    # TensorFlow and would also take in a batch of data.    # We'll simulate an expensive calculation by adding    # a sleep call    grad = np.ones(10)    time.sleep(0.2)    # Update all the parameters with the same gradient.    ps.update_params.remote(grad)
Then we can start two of these worker tasks as follows. Ray tasks (functions) are started with the same remote() invocation:
for _ in range(2):  worker.remote(ps)
Then we can retrieve the parameters from the driver process repeatedly and see that they are being updated by the workers:
>>> ray.get(ps.get_params.remote())array([64., 64., 64., 64., 64., 64., 64., 64., 64., 64.])>>> ray.get(ps.get_params.remote())array([78., 78., 78., 78., 78., 78., 78., 78., 78., 78.])...
When the updates stop, the final values will be 200.
Note that Ray makes it as easy to start up a remote service or actor as it is to define a Python class. Handles to the actor can be passed around to other actors and tasks to allow arbitrary and intuitive messaging and communication patterns. Current alternatives are much more involved. For example, consider how the equivalent runtime service creation and service handle passing would be done with GRPC, as in this documentation.

Unifying Tasks and Actors

We’ve seen that tasks and actors use the same Ray API and are used the same way. This unification of parallel tasks and actors has important benefits, both for simplifying the use of Ray and for building powerful applications through composition.
By way of comparison, popular data processing systems such as Apache Hadoop and Apache Spark allow stateless tasks (functions with no side effects) to operate on immutable data. This assumption simplifies the overall system design and makes it easier for applications to reason about correctness.
However, shared mutable state is common in machine learning applications. That state could be the weights of a neural network, the state of a third-party simulator, or a representation of interactions with the physical world. Ray’s actor abstraction provides an intuitive way to define and manage mutable state in a thread-safe way.
What makes this especially powerful is the way that Ray  inheriting the benefits of both approaches. Ray uses an underlying dynamic task graph to implement both actors and stateless tasks in the same framework. As a consequence, these two abstractions are completely interoperable. Tasks and actors can be created from within other tasks and actors. Both return futures, which can be passed into other tasks or actor methods to introduce scheduling and data dependencies in a natural way. As a result, Ray applications inherit the best features of both tasks and actors.
Here are some of the core concepts used internally by Ray:
Dynamic Task Graphs: When you invoke a remote function or actor method, tasks are added to a dynamically growing graph, which Ray schedules and executes across a cluster (or a single multi-core machine). Tasks can be created by the “driver” application or by other tasks.
Data: Ray efficiently serializes data using the Apache Arrow data layout. Objects are shared between workers and actors on the same machine through shared memory, which avoids the need for copies or deserialization. This optimization is absolutely critical for achieving good performance.
Scheduling: Ray uses a distributed scheduling approach. Each machine has its own scheduler, which manages the workers and actors on that machine. Tasks are submitted by applications and workers to the scheduler on the same machine. From there, they can be reassigned to other workers or passed to other local schedulers. This allows Ray to achieve substantially higher task throughput than what can be achieved with a centralized scheduler, a potential bottleneck and single point of failure. This is essential for many machine learning applications.

Conclusion

Systems like parameter servers are normally implemented and shipped as standalone systems with a nontrivial amount of code, which could be mostly boilerplate for handling distribution, invocation, state management, etc. We’ve seen that Ray’s abstractions and features make it possible to eliminate most of that boilerplate. Hence, any feature enhancements are comparatively easy and your productivity is maximized.
Many of the common services we need in today’s production environments can be implemented this way, quickly and efficiently. Examples include logging, stream processing, simulation, model serving, graph processing, and many others.
I hope you found this brief introduction to Ray intriguing. Please give it a try and let me know what you think!

To Learn More

For more information about Ray, take a look at the following links:
Questions should be directed to the Ray Slack workspace or to the ray-dev Google Group.

Appendix: Running the Code

To run the complete application, first install Ray with pip install ray (or see the Ray installation instructions). Then run the following code with Python. It implements the parameter server as discussed previously, but adds sharding of the parameters in the workers. You can also find a more extensive version of this example as a Jupyter notebook here.
import numpy as npimport rayimport time# Start Ray.ray.init()@ray.remoteclass ParameterServer(object):  def __init__(self, dim):    self.params = np.zeros(dim)  def get_params(self):    return self.params  def update_params(self, grad):    self.params += grad
@ray.remotedef sharded_worker(*parameter_servers):  for _ in range(100):    # Get the latest parameters.    parameter_shards = ray.get(      [ps.get_params.remote() for ps in parameter_servers])    params = np.concatenate(parameter_shards)    # Compute a gradient update as before in `worker`, but     # with additional logic for sharding.    grad = np.ones(10)    # A placeholder for some expensive computation:    time.sleep(0.2)    grad_shards = np.split(grad, len(parameter_servers))    # Send the gradient updates to the parameter servers.    for ps, grad in zip(parameter_servers, grad_shards):      ps.update_params.remote(grad)
# Start two parameter servers, each with half of the parameters.parameter_servers = [ParameterServer.remote(5) for _ in range(2)]# Start 2 workers.workers = [  sharded_worker.remote(*parameter_servers) for _ in range(2)]# Inspect the parameters at regular intervals until we've# reached the end (i.e., each parameter equals 200)while True:  time.sleep(1)  results = ray.get(    [ps.get_params.remote() for ps in parameter_servers])  print(results)  if results[0][0] >= 200:    break
Note that this example focuses on simplicity and that more can be done to optimize this code.