# `Nebulex.Adapters.Partitioned`
[🔗](https://github.com/elixir-nebulex/nebulex_distributed/blob/v3.2.3/lib/nebulex/adapters/partitioned.ex#L1)

Adapter module for the partitioned cache topology.

## Features

  * Partitioned cache topology (Sharding Distribution Model).
  * Consistent hashing via `ExHashRing` for distributing keys across cluster
    nodes.
  * Automatic cluster membership management using Erlang's `:pg`
    (process groups).
  * Support for distributed transactions via `Nebulex.Distributed.Transaction`,
    using Erlang's `:global` module for cluster-wide lock coordination. See
    `Nebulex.Distributed.Transaction` for transaction options and examples.
  * Configurable primary storage adapter.

## Partitioned Cache Topology

There are several key points to consider about a partitioned cache:

  * _**Partitioned**_: The data in a distributed cache is spread out over
    all the servers in such a way that no two servers are responsible for
    the same piece of cached data. This means that the size of the cache
    and the processing power associated with the management of the cache
    can grow linearly with the size of the cluster. Also, it means that
    operations against data in the cache can be accomplished with a
    "single hop," in other words, involving at most one other server.

  * _**Load-Balanced**_:  Since the data is spread out evenly over the
    servers, the responsibility for managing the data is automatically
    load-balanced across the cluster.

  * _**Ownership**_: Exactly one node in the cluster is responsible for each
    piece of data in the cache.

  * _**Point-To-Point**_: The communication for the partitioned cache is all
    point-to-point, enabling linear scalability.

  * _**Location Transparency**_: Although the data is spread out across
    cluster nodes, the exact same API is used to access the data, and the
    same behavior is provided by each of the API methods. This is called
    location transparency, which means that the developer does not have to
    code based on the topology of the cache, since the API and its behavior
    will be the same with a local cache, a replicated cache, or a distributed
    cache.

  * _**Failover**_: Failover of a distributed cache involves promoting backup
    data to be primary storage. When a cluster node fails, all remaining
    cluster nodes determine what data each holds in backup that the failed
    cluster node had primary responsible for when it died. Those data becomes
    the responsibility of whatever cluster node was the backup for the data.
    However, this adapter does not provide fault-tolerance implementation,
    each piece of data is kept in a single node/machine (via sharding), then,
    if a node fails, the data kept by this node won't be available for the
    rest of the cluster members.

> Based on **"Distributed Caching Essential Lessons"** by **Cameron Purdy**
  and [Coherence Partitioned Cache Service][oracle-pcs].

[oracle-pcs]: https://docs.oracle.com/cd/E13924_01/coh.340/e13819/partitionedcacheservice.htm

```asciidoc
                              ┌──────────────┐
                              │    Client    │
                              └──────┬───────┘
                                     │
                              ┌──────┴───────┐
                              │  Hash Ring   │
                              └──────┬───────┘
                                     │
              ┌──────────────────────┼──────────────────────┐
              │                      │                      │
        ┌─────┴────────┐       ┌─────┴────────┐       ┌─────┴────────┐
        │    Node A    │       │    Node B    │       │    Node C    │
        │ Local Cache  │       │ Local Cache  │       │ Local Cache  │
        └──────────────┘       └──────────────┘       └──────────────┘
```

## Consistent Hashing and Key Distribution

The adapter uses `ExHashRing` to implement consistent hashing, which maps keys
to nodes in a way that minimizes data redistribution when the cluster topology
changes.

### How key distribution works

The process is as follows:

  1. **Virtual Nodes (Vnodes)**: Each physical node in the cluster is assigned
    a set of virtual nodes (vnodes) in the hash ring. This enables even
    distribution of keys across the cluster.

  2. **Key Hashing**: When a key is accessed, its hash value (computed using
    `erlang:phash2/1`) is used to find the corresponding vnode in the ring.

  3. **Node Lookup**: `ExHashRing.Ring` finds the node responsible for that
    vnode, which becomes the target for the operation.

  4. **RPC Routing**: The request is sent to the target node via RPC (remote
    procedure call) to read or write the cached value.

### Benefits of consistent hashing

  * **Minimal Key Redistribution**: When nodes join or leave, only a fraction
    of keys are redistributed to other nodes (proportional to the change in
    cluster size).
  * **Even Distribution**: Keys are evenly spread across all nodes in the
    cluster.
  * **Predictable Mapping**: The same key always maps to the same node,
    ensuring cache hits across the cluster.
  * **Efficient Lookup**: Hash ring lookups are O(log n) in terms of vnodes.

## Cluster Membership Management

The adapter maintains a distributed view of the hash ring across all cluster
nodes using two key components:

### Process Groups (`:pg`)

The adapter uses Erlang's built-in `:pg` (process groups) module to track
cluster membership. When a partitioned cache is started:

  1. The cache supervisor PID is registered in a `:pg` group named after the
    cache (e.g., the `:name` option or the cache module name).
  2. All nodes with the same cache running join the same group.
  3. When a node joins or leaves the cluster, `:pg` automatically notifies all
    members subscribed to that group.

### Ring Monitor

The `Nebulex.Distributed.RingMonitor` is a `GenServer` that:

  1. **Subscribes to Cluster Changes**: Uses
    `Nebulex.Distributed.Cluster.monitor_scope/0` to subscribe to all
    `:pg` group changes via `:pg.monitor_scope/1`.

  2. **Handles Join/Leave Events**: When nodes join or leave a group,
    RingMonitor receives `{:join, group, pids}` and `{:leave, group, pids}`
    messages and updates the `ExHashRing.Ring` state accordingly.

  3. **Maintains Ring Consistency**: Keeps the hash ring in sync with the
    current cluster topology by adding/removing nodes from the ring.

### Handling Race Conditions During Startup

During initial cluster formation, multiple nodes may start simultaneously,
leading to race conditions where some nodes miss join events from others.
To solve this, the RingMonitor uses a **periodic rejoin mechanism**:

  * **Rejoin Interval**: The `:rejoin_interval` option (default: 30 seconds)
    specifies an interval at which the RingMonitor periodically rejoins
    the `:pg` group.

  * **Idempotent Joins**: Since `:pg` treats duplicate joins as idempotent, a
    node can safely rejoin without negative side effects.

  * **Forced Ring Updates**: Each periodic rejoin triggers `:join` events that
    force all nodes to update their ring view, ensuring eventual consistency
    even if some initial join events were missed.

This mechanism ensures that all nodes have a consistent view of the ring, even
in the face of concurrent startups or transient network issues.

## Primary Storage Adapter

This adapter depends on a local cache adapter (primary storage), adding a
distributed layer on top of it. You don't need to manually define the primary
storage cache; the adapter initializes it automatically as part of the
supervision tree.

The `:primary_storage_adapter` option (defaults to `Nebulex.Adapters.Local`)
configures which adapter to use for the local storage on each node. Options
for the primary adapter can be specified via the `:primary` configuration option.

## Usage

The cache expects the `:otp_app` and `:adapter` as options when used.
The `:otp_app` should point to an OTP application with the cache
configuration. Optionally, you can configure the desired primary
storage adapter with the option `:primary_storage_adapter`
(defaults to `Nebulex.Adapters.Local`). See the compile time options
for more information:

* `:primary_storage_adapter` (`t:atom/0`) - The adapter module used for the primary (local) storage on each cluster
  node. The partitioned adapter is a distributed wrapper that routes
  requests to the appropriate node based on consistent hashing. The actual
  data storage is handled by the primary storage adapter on each node.
  This option allows you to choose which adapter to use for this local
  storage. The configuration for the primary adapter is specified via the
  `:primary` start option. The default value is `Nebulex.Adapters.Local`.

For example:

    defmodule MyApp.PartitionedCache do
      use Nebulex.Cache,
        otp_app: :my_app,
        adapter: Nebulex.Adapters.Partitioned
    end

Providing the `:primary_storage_adapter`:

    defmodule MyApp.PartitionedCache do
      use Nebulex.Cache,
        otp_app: :my_app,
        adapter: Nebulex.Adapters.Partitioned,
        adapter_opts: [primary_storage_adapter: Nebulex.Adapters.Local]
    end

Where the configuration for the cache must be in your application environment,
usually defined in your `config/config.exs`:

    config :my_app, MyApp.PartitionedCache,
      primary: [
        gc_interval: :timer.hours(12),
        gc_memory_check_interval: :timer.seconds(10),
        max_size: 1_000_000,
        allocated_memory: 2_000_000_000
      ]

If your application was generated with a supervisor (by passing `--sup`
to `mix new`) you will have a `lib/my_app/application.ex` file containing
the application start callback that defines and starts your supervisor.
You just need to edit the `start/2` function to start the cache as a
supervisor on your application's supervisor:

    def start(_type, _args) do
      children = [
        {MyApp.PartitionedCache, []},
        ...
      ]

See `Nebulex.Cache` for more information.

## Configuration options

This adapter supports the following configuration options:

* `:primary` (`t:keyword/0`) - Configuration options passed to the primary storage adapter specified via
  `:primary_storage_adapter`. The available options depend on which adapter
  you choose. Refer to the documentation of your chosen primary storage
  adapter for the complete list of supported options. The default value is `[]`.

* `:hash_ring` (`t:keyword/0`) - Configuration options for the consistent hash ring used to distribute keys
  across cluster nodes. The hash ring maps each key to a node using virtual
  nodes (vnodes), which enables:

    * Minimal key redistribution when nodes join or leave.
    * Even distribution of keys across nodes.
    * Efficient node lookup for cache operations.

  See [`ExHashRing.Ring.start_link/2`][ex_hash_ring] for the complete list
  of supported options.

  [ex_hash_ring]: https://hexdocs.pm/ex_hash_ring/ExHashRing.Ring.html#start_link/1

  The default value is `[]`.

* `:rejoin_interval` (`t:timeout/0`) - The interval in **milliseconds** at which the `RingMonitor` periodically
  rejoins the `:pg` group to force ring synchronization across all cluster
  nodes.

  **Purpose:** This mechanism helps handle race conditions during concurrent
  node startups by ensuring all nodes eventually have a consistent view of
  the hash ring. Even if some join events are missed during initial cluster
  formation, each rejoin triggers new notifications that force ring updates.

  **Trade-offs:**

    * **Shorter intervals** (e.g., 10 seconds):
      - Faster consistency convergence.
      - More overhead from frequent rejoin events and notifications.
      - Better for highly dynamic clusters with frequent node changes.

    * **Longer intervals** (e.g., 60 seconds):
      - Lower overhead and reduced network traffic.
      - Slower eventual consistency.
      - Fine for stable clusters that don't change frequently.

  **Default (30 seconds):** Works well for most use cases, balancing
  consistency and overhead.

  The default value is `30000`.

* `:node_filter` (function of arity 1) - An optional 1-arity function that filters which cluster nodes are
  added to the hash ring. The function receives a node name
  (`t:node/0`) and must return `true` to include the node in the
  ring, or `false` to exclude it. Only nodes present in the ring
  will be selected to cache data.

  Excluded nodes are still part of the cache cluster (`:pg` group),
  so the cache remains fully usable from them — reads, writes, and
  all other operations work transparently, routing to ring nodes
  as usual.

  By default, all nodes that join the cluster are added to the
  hash ring.

  > #### Function Captures {: .info}
  >
  > Due to how anonymous functions are implemented in the Erlang VM,
  > it is best to use function captures (`&Mod.fun/1`) as node filters.
  > In other words, avoid using literal anonymous functions
  > (`fn ... -> ... end`) or local function captures (`&my_filter/1`)
  > as they cannot be serialized across distributed nodes.

  See the ["Node Filter"](`Nebulex.Adapters.Partitioned#module-node-filter`)
  section for more information and examples.

## Shared runtime options

When using the partitioned adapter, all of the cache functions outlined in
`Nebulex.Cache` accept the following options:

* `:timeout` (`t:timeout/0`) - The time in **milliseconds** to wait for a cache command to finish.

  This timeout applies to RPC calls made to remote nodes during partitioned
  cache operations. Since the partitioned adapter routes requests across
  cluster nodes, network latency and node load affect execution time.

  Set to `:infinity` to wait indefinitely. If a timeout occurs, the
  operation fails with an error. Note that the underlying cache operation
  may still complete on the remote node asynchronously.

  The default value is `5000`.

### Stream options

The `stream` command supports the following options:

* `:on_error` (`:raise` | `:nothing`) - Controls error handling during stream evaluation across cluster nodes.

  When streaming entries from a partitioned cache, the adapter evaluates the
  stream on each cluster node. Since this involves RPC calls to remote
  nodes, failures can occur due to:

    * Network issues or RPC timeouts.
    * Errors on the remote node.
    * Temporary node unavailability.

  **Options:**

    * `:raise` (default) - Raise an exception immediately when an error
      occurs on any node. The stream evaluation stops, and no further nodes
      are queried.

    * `:nothing` - Skip errors silently and continue. Returns only
      successful results from nodes that responded without errors.
      Useful for resilience in environments where temporary node
      failures are acceptable.

  The default value is `:raise`.

## Telemetry events

Since the partitioned adapter depends on the configured primary storage
cache (which uses a local cache adapter), this one will also emit Telemetry
events. Therefore, there will be events emitted by the partitioned adapter
as well as the primary storage cache. For example, the cache defined before
`MyApp.PartitionedCache` will emit the following events:

  * `[:my_app, :partitioned_cache, :command, :start]`
  * `[:my_app, :partitioned_cache, :primary, :command, :start]`
  * `[:my_app, :partitioned_cache, :command, :stop]`
  * `[:my_app, :partitioned_cache, :primary, :command, :stop]`
  * `[:my_app, :partitioned_cache, :command, :exception]`
  * `[:my_app, :partitioned_cache, :primary, :command, :exception]`

As you may notice, the telemetry prefix by default for the cache is
`[:my_app, :partitioned_cache]`. However, you could specify the
`:telemetry_prefix` for the primary storage within the `:primary` options
(if you want to override the default). See the
[Telemetry guide](https://hexdocs.pm/nebulex/telemetry.html)
for more information and examples.

## Adapter-specific telemetry events

The RingMonitor process emits the following Telemetry events during the
lifetime of the partitioned cache:

  * `telemetry_prefix ++ [:ring_monitor, :started]` - Dispatched when the
    RingMonitor process starts.

    * Measurements: `%{system_time: non_neg_integer}`
    * Metadata:

      ```
      %{
        adapter_meta: %{optional(atom) => term},
        node: atom
      }
      ```

  * `telemetry_prefix ++ [:ring_monitor, :joined]` - Dispatched when the
    RingMonitor has successfully joined the `:pg` group to enter the cluster.

    * Measurements: `%{system_time: non_neg_integer}`
    * Metadata:

      ```
      %{
        adapter_meta: %{optional(atom) => term},
        node: atom
      }
      ```

  * `telemetry_prefix ++ [:ring_monitor, :nodes_added]` - Dispatched when
    nodes are added to the hash ring.

    * Measurements: `%{system_time: non_neg_integer}`
    * Metadata:

      ```
      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        nodes: [atom]
      }
      ```

  * `telemetry_prefix ++ [:ring_monitor, :nodes_removed]` - Dispatched when
    nodes are removed from the hash ring.

    * Measurements: `%{system_time: non_neg_integer}`
    * Metadata:

      ```
      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        nodes: [atom]
      }
      ```

  * `telemetry_prefix ++ [:ring_monitor, :exit]` - Dispatched when the
    RingMonitor receives an EXIT signal.

    * Measurements: `%{system_time: non_neg_integer}`
    * Metadata:

      ```
      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        reason: term
      }
      ```

  * `telemetry_prefix ++ [:ring_monitor, :stopped]` - Dispatched when the
    RingMonitor process terminates.

    * Measurements: `%{system_time: non_neg_integer}`
    * Metadata:

      ```
      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        reason: term
      }
      ```

## Info API

As explained above, the partitioned adapter depends on the configured primary
storage adapter. Therefore, the information the `info` command provides will
depend on the primary storage adapter. The Nebulex built-in adapters support
the recommended keys `:server`, `:memory`, and `:stats`. Additionally, the
partitioned adapter supports:

  * `:nodes_info` - A map with the info for each node.
  * `:nodes` - A list with the cluster nodes.

For example, the info for `MyApp.PartitionedCache` may look like this:

    iex> MyApp.PartitionedCache.info!()
    %{
      memory: %{total: nil, used: 344600},
      server: %{
        cache_module: MyApp.PartitionedCache,
        cache_name: :partitioned_cache,
        cache_adapter: Nebulex.Adapters.Partitioned,
        cache_pid: #PID<0.1053.0>,
        nbx_version: "3.0.0"
      },
      stats: %{
        hits: 0,
        misses: 0,
        writes: 0,
        evictions: 0,
        expirations: 0,
        deletions: 0,
        updates: 0
      },
      nodes: [:"node1@127.0.0.1", ...],
      nodes_info: %{
        "node1@127.0.0.1": %{
          memory: %{total: nil, used: 68920},
          server: %{
            cache_module: MyApp.PartitionedCache.Primary,
            cache_name: MyApp.PartitionedCache.Primary,
            cache_adapter: Nebulex.Adapters.Local,
            cache_pid: #PID<23981.823.0>,
            nbx_version: "3.0.0"
          },
          stats: %{
            hits: 0,
            misses: 0,
            writes: 0,
            evictions: 0,
            expirations: 0,
            deletions: 0,
            updates: 0
          }
        },
        ...
      }
    }

## Extended API

This adapter provides some additional convenience functions to the
`Nebulex.Cache` API.

Retrieving the primary storage or local cache module:

    MyCache.__primary__()

Retrieving the cluster nodes associated with the given cache `name`:

    MyCache.nodes()

Get a cluster node based on the given `key`:

    MyCache.find_node("mykey")

    MyCache.find_node!("mykey")

Joining the cache to the cluster:

    MyCache.join_cluster()

Leaving the cluster (removes the cache from the cluster):

    MyCache.leave_cluster()

## Transactions

The partitioned adapter supports distributed transactions via
`Nebulex.Distributed.Transaction`, which uses Erlang's `:global` module for
cluster-wide lock coordination.

**Automatic Cluster Coordination**: Transactions automatically coordinate
locks across all nodes in the partitioned cache cluster. The adapter
internally determines which nodes participate in the cluster (via hash ring)
and ensures locks are acquired across all of them.

**Example**:

    # Transaction with fine-grained locking (recommended)
    MyCache.transaction(fn ->
      # Lock is coordinated across all cluster nodes
      counter = MyCache.get!(:counter, default: 0)
      MyCache.put!(:counter, counter + 1)
    end, keys: [:counter])

    # Multi-key transaction
    MyCache.transaction(fn ->
      alice = MyCache.get!(:alice)
      bob = MyCache.get!(:bob)

      MyCache.put!(:alice, %{alice | balance: alice.balance - 100})
      MyCache.put!(:bob, %{bob | balance: bob.balance + 100})
    end, keys: [:alice, :bob])

**Important**: Always specify the `:keys` option to enable fine-grained
locking and allow concurrent transactions on different keys. Without keys,
a global lock is used, which serializes all transactions across the entire
cluster and severely impacts performance.

**Cross-Partition Transactions**: When keys hash to different partitions
(nodes), the transaction still works correctly by acquiring locks across all
relevant nodes. However, keep in mind that cross-partition transactions are
more expensive due to the distributed coordination overhead.

See `Nebulex.Distributed.Transaction` for more information about transaction
options, behavior, retry policies, and performance considerations.

## Node Filter

By default, every node that joins the cluster is added to the hash ring and
therefore is selected to cache data. The `:node_filter` option lets
you control which nodes are part of the ring.

This is useful when certain nodes in the cluster should act as **clients
only** — for example, background job runners, test nodes, or admin consoles
— that need to use the cache but should not store any data locally.

Excluded nodes are still part of the cache cluster, so the cache remains
fully usable from them — reads, writes, and all other operations work
transparently, routing to ring nodes as usual.

### Example

    # In your supervision tree
    children = [
      {MyApp.PartitionedCache, node_filter: &MyApp.NodeFilter.cache_node?/1},
      ...
    ]

    # In your application code
    defmodule MyApp.NodeFilter do
      # Only nodes whose name starts with "cache@" will be part of the hash ring
      def cache_node?(node) do
        node
        |> to_string()
        |> String.starts_with?("cache@")
      end
    end

## Caveats of partitioned adapter

For operations that receive anonymous functions as arguments, such as
`c:Nebulex.Cache.get_and_update/3`, `c:Nebulex.Cache.update/4`,
`c:Nebulex.Cache.fetch_or_store/3`, and `c:Nebulex.Cache.get_or_store/3`,
etc., there's an important consideration: these anonymous functions are
compiled into the module where they are created. Since the distributed adapter
executes operations on remote nodes, these functions may not exist on the
target nodes.

To ensure these operations work correctly in a distributed environment, you
must provide functions from modules that exist on all nodes in the cluster.
This can be achieved by:

* Using named functions from modules that are available across all nodes.
* Defining the functions in a shared module that's loaded on every node.
* Using function references that can be serialized and transmitted.

Example of the recommended approach:

    # Instead of anonymous functions, use named functions from shared modules
    defmodule MyApp.CacheHelpers do
      def increment_value(current_value) do
        (current_value || 0) + 1
      end
    end

    # Use the named function in cache operations
    MyCache.get_and_update("counter", &MyApp.CacheHelpers.increment_value/1)

# `with_dynamic_cache`

Helper function to use dynamic cache for internal primary cache storage
when needed.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
