Skip to content

bivariate_quantiles module

This takes in observations of a bivariate random variable y and computes vector quantiles and vector ranks à la Chernozhukov-Galichon-Hallin-Henry (Ann. Stats. 2017).

Note

if the math looks strange in the documentation, just reload the page.

The sequence of steps is as follows:

  • choose a number of Chebyshev nodes for numerical integration and optimize the weights: v = solve_for_v(y, n_nodes)
  • to obtain the \((u_1,u_2)\) quantiles for \((u_1, u_2)\in [0,1]\), run qtiles_y = bivariate_quantiles_v(y, v, u1, u2)
  • to compute the vector ranks for all points in the sample (the barycenters of the cells in the power diagram): ranks_y = bivariate_ranks_v(y, v, n_nodes)

Steps 1 and 2 can be combined: qtiles_y = bivariate_quantiles(y, v, u1, u2, n_nodes)

Steps 1 and 3 can be combined: ranks_y = bivariate_ranks(y, n_nodes)

bivariate_quantiles(y, u, n_nodes=32, verbose=False)

Solve for the dual weights then evaluate bivariate quantiles.

Parameters:

Name Type Description Default
y ndarray

Observations, shape (n, 2).

required
u ndarray

Query points in [0, 1]^2 (shape (m, 2)).

required
n_nodes int

Number of Chebyshev nodes for the quadrature.

32
verbose bool

Print optimisation diagnostics when True.

False

Returns:

Type Description
ndarray

Bivariate quantiles at u.

Source code in bs_python_utils/bivariate_quantiles.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def bivariate_quantiles(
    y: np.ndarray, u: np.ndarray, n_nodes: int = 32, verbose: bool = False
) -> np.ndarray:
    """Solve for the dual weights then evaluate bivariate quantiles.

    Args:
        y: Observations, shape ``(n, 2)``.
        u: Query points in ``[0, 1]^2`` (shape ``(m, 2)``).
        n_nodes: Number of Chebyshev nodes for the quadrature.
        verbose: Print optimisation diagnostics when ``True``.

    Returns:
        Bivariate quantiles at ``u``.
    """
    v = solve_for_v_(y, n_nodes, verbose)
    return bivariate_quantiles_v(y, u, v)

bivariate_quantiles_v(y, u, v)

Evaluate vector quantiles for a given set of dual weights.

Parameters:

Name Type Description Default
y ndarray

Observations with shape (n, 2).

required
u ndarray

Evaluation points in [0, 1]^2 (shape (m, 2)).

required
v ndarray

Dual weights solving the optimal transport problem (length n).

required

Returns:

Type Description
ndarray

Array of quantile locations with shape (m, 2).

Source code in bs_python_utils/bivariate_quantiles.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def bivariate_quantiles_v(y: np.ndarray, u: np.ndarray, v: np.ndarray) -> np.ndarray:
    """Evaluate vector quantiles for a given set of dual weights.

    Args:
        y: Observations with shape ``(n, 2)``.
        u: Evaluation points in ``[0, 1]^2`` (shape ``(m, 2)``).
        v: Dual weights solving the optimal transport problem (length ``n``).

    Returns:
        Array of quantile locations with shape ``(m, 2)``.
    """
    u = np.atleast_2d(u)
    if u.shape[1] != 2:
        bs_error_abort("u must have two columns")
    m = u.shape[0]
    q = np.empty((m, 2))
    block = max(1, min(m, 5_000))
    for start in range(0, m, block):
        stop = min(start + block, m)
        chunk = u[start:stop]
        net_val = chunk @ y.T - v
        k_max = np.argmax(net_val, axis=1)
        q[start:stop] = y[k_max]
    return cast(np.ndarray, q)

bivariate_ranks(y, n_nodes=32, verbose=False)

Compute ranks by first solving for the optimal weights v.

Parameters:

Name Type Description Default
y ndarray

Observations, shape (n, 2).

required
n_nodes int

Number of Chebyshev nodes for the quadrature.

32
verbose bool

Print optimisation diagnostics when True.

False

Returns:

Type Description
ndarray

Average ranks with shape (n, 2).

Source code in bs_python_utils/bivariate_quantiles.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def bivariate_ranks(
    y: np.ndarray, n_nodes: int = 32, verbose: bool = False
) -> np.ndarray:
    """Compute ranks by first solving for the optimal weights ``v``.

    Args:
        y: Observations, shape ``(n, 2)``.
        n_nodes: Number of Chebyshev nodes for the quadrature.
        verbose: Print optimisation diagnostics when ``True``.

    Returns:
        Average ranks with shape ``(n, 2)``.
    """
    v = solve_for_v_(y, n_nodes, verbose)
    return bivariate_ranks_v(y, v, n_nodes)

bivariate_ranks_v(y, v, n_nodes=32, presorted=False)

Compute the barycentric ranks of each observation given optimal weights.

Parameters:

Name Type Description Default
y ndarray

Observations with shape (n, 2).

required
v ndarray

Dual weights returned by solve_for_v_.

required
n_nodes int

Number of Chebyshev nodes used in the quadrature.

32
presorted bool

Set to True when y/v are pre-sorted by the second coordinate.

False

Returns:

Type Description
ndarray

Array of average ranks (shape (n, 2)) with nan for zero-mass cells.

Source code in bs_python_utils/bivariate_quantiles.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def bivariate_ranks_v(
    y: np.ndarray, v: np.ndarray, n_nodes: int = 32, presorted: bool = False
) -> np.ndarray:
    """Compute the barycentric ranks of each observation given optimal weights.

    Args:
        y: Observations with shape ``(n, 2)``.
        v: Dual weights returned by ``solve_for_v_``.
        n_nodes: Number of Chebyshev nodes used in the quadrature.
        presorted: Set to ``True`` when ``y``/``v`` are pre-sorted by the
            second coordinate.

    Returns:
        Array of average ranks (shape ``(n, 2)``) with ``nan`` for zero-mass cells.
    """
    n, d = y.shape

    if d != 2:
        bs_error_abort(f"only works for 2-dimensional y, not for {d}")

    interval01 = Interval(0.0, 1.0)
    u1_nodes, u1_weights = cheb_get_nodes_1d(interval01, n_nodes)

    if presorted:
        sort_order = np.arange(n)
        y_sorted = y
        v_sorted = v
    else:
        sort_order = np.argsort(y[:, 1])
        y_sorted = y[sort_order, :]
        v_sorted = v[sort_order]

    a_mat, b_mat = _compute_ab(y_sorted, v_sorted)

    average_ranks = np.zeros((n, 2))

    for k in range(n):
        left_bounds, right_bounds = _compute_u2_bounds(k, u1_nodes, a_mat, b_mat)
        pos_diffs = np.maximum(right_bounds - left_bounds, 0.0)
        pos_diffs_sq = np.maximum(
            right_bounds * right_bounds - left_bounds * left_bounds, 0.0
        )
        prob_k = pos_diffs @ u1_weights
        if prob_k <= 1e-12:
            average_ranks[sort_order[k], :] = np.array([np.nan, np.nan])
            continue
        average_ranks[sort_order[k], 0] = ((u1_nodes * pos_diffs) @ u1_weights) / prob_k
        average_ranks[sort_order[k], 1] = ((pos_diffs_sq @ u1_weights) / 2.0) / prob_k

    return average_ranks

solve_for_v_(y, n_nodes=32, verbose=False)

Solve the dual optimisation to obtain the optimal weights v.

Parameters:

Name Type Description Default
y ndarray

Observations with shape (n, 2).

required
n_nodes int

Number of Chebyshev nodes for the quadrature.

32
verbose bool

Print optimisation diagnostics when True.

False

Returns:

Type Description
ndarray

Array of length n containing the optimal weights (including the residual term).

Source code in bs_python_utils/bivariate_quantiles.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def solve_for_v_(y: np.ndarray, n_nodes: int = 32, verbose: bool = False) -> np.ndarray:
    """Solve the dual optimisation to obtain the optimal weights ``v``.

    Args:
        y: Observations with shape ``(n, 2)``.
        n_nodes: Number of Chebyshev nodes for the quadrature.
        verbose: Print optimisation diagnostics when ``True``.

    Returns:
        Array of length ``n`` containing the optimal weights (including the
            residual term).
    """
    n, d = y.shape

    if d != 2:
        bs_error_abort(f"only works for 2-dimensional y, not for {d}")

    # sort by increasing y[:, 1]
    sort_order = np.argsort(y[:, 1])
    y_sorted = y[sort_order, :]

    v0 = np.mean(y_sorted[:-1, :], 1)

    interval01 = Interval(0.0, 1.0)
    u1_nodes, u1_weights = cheb_get_nodes_1d(interval01, n_nodes)

    argsog = [y_sorted, u1_nodes, u1_weights, verbose]

    res = minimize_free(_obj, _grad, v0, args=argsog)
    if verbose:
        print_optimization_results(res, "Minimizing over v")

    if not res.success:
        bs_error_abort("Problem! the optimization failed.")
    vstar = res.x
    if verbose:
        print(f"The final gradient over v is close to 0: error {npmaxabs(res.jac)}")
    vstar1_sorted = np.append(vstar, -np.sum(vstar))

    # revert to original order
    vstar1 = np.zeros_like(vstar1_sorted)
    vstar1[sort_order] = vstar1_sorted

    return vstar1