Skip to content

bsstats module

Contains some statistical routines.

TslsResults dataclass

contains the full results of a TSLS regression

Source code in bs_python_utils/bsstats.py
23
24
25
26
27
28
29
30
31
32
33
34
@dataclass
class TslsResults:
    """contains the full results of a TSLS regression"""

    iv_estimates: float | np.ndarray | None
    r2_first_iv: float | np.ndarray | None
    r2_y: float | None
    r2_second: float | np.ndarray | None
    y_proj: float | np.ndarray | None
    y_coeffs: float | np.ndarray | None
    X_IV_proj: float | np.ndarray | None
    b_proj_IV: float | np.ndarray | None

bs_multivariate_normal_pdf(values_x, means_x, cov_mat)

Multivariate (or univariate) normal probability density function at values_x

Parameters:

Name Type Description Default
values_x ndarray

values at which to evaluate the pdf, an n-vector or an (n, nvars) matrix

required
means_x float | ndarray

means of the multivariate normal, a float or an (nvars) vector

required
cov_mat float | ndarray

covariance matrix of the multivariate normal, a float or an (nvars, nvars) matrix

required

Returns:

Type Description
ndarray

the values of the density at values_x

Source code in bs_python_utils/bsstats.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def bs_multivariate_normal_pdf(
    values_x: np.ndarray, means_x: float | np.ndarray, cov_mat: float | np.ndarray
) -> np.ndarray:
    """Multivariate (or univariate) normal probability density function at values_x

    Args:
        values_x: values at which to evaluate the pdf, an `n`-vector or an `(n, nvars)` matrix
        means_x: means of the multivariate normal, a float or an `(nvars)` vector
        cov_mat: covariance matrix of the multivariate normal, a float or an `(nvars, nvars)` matrix

    Returns:
        the values of the density at `values_x`
    """
    ndims_values = check_vector_or_matrix(values_x, "bs_multivariate_normal_pdf")
    if ndims_values == 1:  # we are evaluating a univariate normal
        # if not type(means_x) == float:
        #     bs_error_abort(f"means_x should be a float as values_x is a vector")
        # if not type(cov_mat) == float:
        #     bs_error_abort(f"cov_mat should be a float as values_x is a vector")
        sigma2 = cov_mat
        resid = values_x - means_x
        dval = np.exp(-0.5 * resid * resid / sigma2) / np.sqrt(2 * np.pi * sigma2)
        return cast(np.ndarray, dval)
    else:  # we are evaluating a multivariate normal
        n, nvars = values_x.shape
        n_means = check_vector(means_x, "bs_multivariate_normal_pdf")
        if n_means != nvars:
            bs_error_abort(f"means_x should be a vector of size {nvars} not {n_means}")
        nrows, ncols = check_matrix(cov_mat, "bs_multivariate_normal_pdf")
        if nrows != ncols or nrows != nvars:
            bs_error_abort(
                f"cov_mat should be a matrix ({nvars}, {nvars}) not ({nrows}, {ncols})"
            )
        resid = values_x - means_x
        argresid = spla.solve(cov_mat, resid.T)
        argexp = np.zeros(n)
        for i in range(n):
            argexp[i] = np.dot(resid[i, :], argresid[:, i])
        dval = np.exp(-0.5 * argexp) / np.sqrt(
            ((2 * np.pi) ** nvars) * spla.det(cov_mat)
        )
        return cast(np.ndarray, dval)

emcee_draw(n_samples, log_pdf, p0, params=None, n_burn_in=100, seed=8754)

Draw samples from a target log-density using emcee.

Parameters:

Name Type Description Default
n_samples int

Number of samples to keep after burn-in.

required
log_pdf Callable[[ndarray, list], float]

Callable returning the log-density; invoked as log_pdf(theta, *params).

required
p0 ndarray

Initial walker states of shape (n_walkers, n_dims).

required
params list | None

Optional list of extra parameters forwarded to log_pdf.

None
n_burn_in int | None

Number of burn-in iterations prior to collecting samples.

100
seed int | None

Seed for the final random subsampling step.

8754

Returns:

Type Description
ndarray

An (n_samples, n_dims) array of draws selected (without replacement when possible)

ndarray

from the flattened emcee chain.

Note

log_pdf should return -np.inf outside the support, and p0 should lie within that support to ensure the sampler mixes correctly.

Source code in bs_python_utils/bsstats.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def emcee_draw(
    n_samples: int,
    log_pdf: Callable[[np.ndarray, list], float],
    p0: np.ndarray,
    params: list | None = None,
    n_burn_in: int | None = 100,
    seed: int | None = 8754,
) -> np.ndarray:
    """Draw samples from a target log-density using ``emcee``.

    Args:
        n_samples: Number of samples to keep after burn-in.
        log_pdf: Callable returning the log-density; invoked as ``log_pdf(theta, *params)``.
        p0: Initial walker states of shape ``(n_walkers, n_dims)``.
        params: Optional list of extra parameters forwarded to ``log_pdf``.
        n_burn_in: Number of burn-in iterations prior to collecting samples.
        seed: Seed for the final random subsampling step.

    Returns:
        An ``(n_samples, n_dims)`` array of draws selected (without replacement when possible)
        from the flattened ``emcee`` chain.

    Note:
        ``log_pdf`` should return ``-np.inf`` outside the support, and ``p0`` should lie within
        that support to ensure the sampler mixes correctly.
    """
    n_walkers, n_dims = p0.shape
    # burn in
    sampler = EnsembleSampler(n_walkers, n_dims, log_pdf, args=tuple(params or ()))
    state = sampler.run_mcmc(p0, n_burn_in)
    sampler.reset()
    # generate the samples
    sampler.run_mcmc(state, n_samples)
    samples = sampler.get_chain(flat=True)
    samples = samples.reshape(-1, n_dims)
    rng = np.random.default_rng(seed)
    replace = samples.shape[0] < n_samples
    samples = rng.choice(samples, size=n_samples, replace=replace, axis=0)
    return cast(np.ndarray, samples)

estimate_pdf(x_obs, x_points, MIN_SIZE_NONPAR=200, weights=None)

Estimate the (possibly weighted) density of x at specified points.

Observations are treated as multivariate when x_obs has two dimensions. For sufficiently large samples a Gaussian KDE with Silverman bandwidth is used; otherwise a normal approximation (with weighted mean/covariance when weights are supplied) is returned.

Parameters:

Name Type Description Default
x_obs ndarray

Observed data, either (n,) or (n, nvars).

required
x_points ndarray

Evaluation points, either (m,) or (m, nvars).

required
MIN_SIZE_NONPAR int

Sample-size threshold controlling the nonparametric vs. Gaussian fallback.

200
weights ndarray | None

Optional observation weights (length n).

None

Returns:

Type Description
ndarray

Array of density estimates at x_points.

Source code in bs_python_utils/bsstats.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def estimate_pdf(
    x_obs: np.ndarray,
    x_points: np.ndarray,
    MIN_SIZE_NONPAR: int = 200,
    weights: np.ndarray | None = None,
) -> np.ndarray:
    """Estimate the (possibly weighted) density of ``x`` at specified points.

    Observations are treated as multivariate when ``x_obs`` has two dimensions. For sufficiently
    large samples a Gaussian KDE with Silverman bandwidth is used; otherwise a normal
    approximation (with weighted mean/covariance when ``weights`` are supplied) is returned.

    Args:
        x_obs: Observed data, either ``(n,)`` or ``(n, nvars)``.
        x_points: Evaluation points, either ``(m,)`` or ``(m, nvars)``.
        MIN_SIZE_NONPAR: Sample-size threshold controlling the nonparametric vs. Gaussian fallback.
        weights: Optional observation weights (length ``n``).

    Returns:
        Array of density estimates at ``x_points``.
    """
    ndims_x = check_vector_or_matrix(x_obs, "estimate_pdf")
    ndims_valx = check_vector_or_matrix(x_points, "estimate_pdf")

    if ndims_x == 1:
        n_obs = x_obs.size
        if ndims_valx != 1:
            bs_error_abort(f"x_points should have one dimension, not {ndims_valx}")
        xt_obs = x_obs.reshape((-1, 1))
        nvars = 1
        xt_points = x_points.reshape((-1, 1))
    else:  # ndims_x == 2
        n_obs, nvars = x_obs.shape
        if ndims_valx == 1:  # only one x point with  nv elements
            nv = x_points.size
        else:  # several x points with  nv elements
            nv = x_points.shape[1]
        if nv != nvars:
            bs_error_abort(f"x_points should have {nvars} variables, not {nv}")
        xt_obs = x_obs
        xt_points = x_points

    if weights is not None:
        n_w = check_vector(weights, "estimate_pdf")
        if n_w != n_obs:
            bs_error_abort(
                f"if weights is given, it should be a vector of size {n_obs} not {n_w}"
            )

    min_size_np = MIN_SIZE_NONPAR ** ((4.0 + nvars) / 5.0)

    if n_obs > min_size_np:  # cell large enough to use nonparametrics
        # fit joint density of x
        kde = sts.gaussian_kde(xt_obs.T, bw_method="silverman", weights=weights)
        # density of x at values_x
        f_x = kde.evaluate(xt_points.T)
    else:
        # sample too small, we fit a normal
        if ndims_x == 1:  # univariate
            if weights is None:
                mean_x = np.mean(x_obs)
                var_x = np.var(x_obs)
            else:
                mean_x = np.average(x_obs, weights=weights)
                var_x = np.average((x_obs - mean_x) ** 2, weights=weights)
            f_x = bs_multivariate_normal_pdf(x_points, mean_x, var_x)
        else:  # multivariate
            if weights is None:
                means_x = np.mean(x_obs, 0)
                cov_mat = np.cov(x_obs.T)
            else:
                weights = weights / np.sum(weights)
                means_x = np.average(x_obs, axis=0, weights=weights)
                centered = x_obs - means_x
                cov_mat = centered.T @ (centered * weights[:, None])
            f_x = bs_multivariate_normal_pdf(x_points, means_x, cov_mat)
    return cast(np.ndarray, f_x)

flexible_reg(Y, X, mode='NP', var_types=None, n_sub=None, n_res=1, verbose=False)

flexible regression of Y on X

Parameters:

Name Type Description Default
Y ndarray

independent variable (nobs) or (nobs, ny)

required
X ndarray

covariates (nobs) or (nobs, nx); should not include a constant term

required
mode str

what flexible means * 'NP': non parametric * 'SPL': spline regression, only on one covariate * '1': linear * '2': quadratic

'NP'
var_types str | None

[for 'NP' only] specify types of all X variables if not all of them are continuous; one character per variable * 'c' for continuous * 'u' discrete unordered * 'o' discrete ordered

None
n_sub int | None

[for 'NP' only] size of subsample for cross-validation; by default it is 200^{(m+4)/5}

None
n_res int

[for 'NP' only] how many subsamples we draw; 1 if None

1
verbose bool

prints stuff if True

False

Returns:

Type Description
ndarray

E(y|X) at the sample points

Source code in bs_python_utils/bsstats.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def flexible_reg(
    Y: np.ndarray,
    X: np.ndarray,
    mode: str = "NP",
    var_types: str | None = None,
    n_sub: int | None = None,
    n_res: int = 1,
    verbose: bool = False,
) -> np.ndarray:
    """flexible regression  of `Y` on `X`

    Args:
        Y: independent variable `(nobs)` or `(nobs, ny)`
        X: covariates `(nobs)` or `(nobs, nx)`; should **not** include a constant term
        mode: what flexible means
            * 'NP': non parametric
            * 'SPL': spline regression, only on one covariate
            * '1': linear
            * '2': quadratic
        var_types: [for 'NP' only]  specify types of all `X` variables if not all of them are continuous; 
            one character per variable
            * 'c' for continuous
            * 'u' discrete unordered
            * 'o' discrete ordered
        n_sub: [for 'NP' only] size of subsample for cross-validation; \
            by default it is `200^{(m+4)/5}`
        n_res: [for 'NP' only] how many subsamples we draw; 1 if `None`
        verbose: prints stuff if True

    Returns: 
        `E(y|X)` at the sample points
    """
    if mode == "NP":
        if Y.ndim == 2:
            ny = Y.shape[1]
            Y_fit = np.zeros_like(Y)
            for iy in range(ny):
                Y_fit[:, iy] = reg_nonpar_fit(
                    Y[:, iy],
                    X,
                    var_types=var_types,
                    n_sub=n_sub,
                    n_res=n_res,
                    verbose=verbose,
                )
            return Y_fit
        else:
            return reg_nonpar_fit(
                Y, X, var_types=var_types, n_sub=n_sub, n_res=n_res, verbose=verbose
            )
    elif mode == "SPL":
        if X.ndim > 1:
            bs_error_abort("with a spline, only works in one dimension")
        return spline_reg(Y, X)
    else:
        try:
            imode = int(mode)
        except (TypeError, ValueError):
            bs_error_abort(f"does not accept mode={mode}")
        preg, _, _ = proj_Z(Y, X, p=imode, verbose=verbose)
        return preg

kde_resample(data, n_samples, n_bw=10)

Fit a nonparametric density to data by KDE, with cross-validation with starting point at rule of thumb, and generate samples from the estimated density.

Parameters:

Name Type Description Default
data ndarray

an (n_obs, n_dims) matrix of data

required
n_samples int

how many iid draws we want

required
n_bw int

how mamy bandwidths we try from 1/10th to 10 times the rule-of-thumb

10

Returns:

Type Description
tuple[ndarray, float]

an (n_samples, n_dims) matrix of draws, and the bandwidth used.

Source code in bs_python_utils/bsstats.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
def kde_resample(
    data: np.ndarray, n_samples: int, n_bw: int = 10
) -> tuple[np.ndarray, float]:
    """Fit a nonparametric density to data by KDE, with cross-validation with starting point at rule of thumb,
    and generate samples from the estimated density.

    Args:
        data: an `(n_obs, n_dims)` matrix of data
        n_samples: how many iid draws we want
        n_bw: how mamy bandwidths we try from 1/10th to 10 times the rule-of-thumb

    Returns:
        an `(n_samples, n_dims)` matrix of draws, and the bandwidth used.
    """
    n_obs, n_dims = check_matrix(data)
    spreads = np.std(data, axis=0, ddof=1)
    h_scale = float(np.mean(spreads))
    if not np.isfinite(h_scale) or h_scale <= 0.0:
        h_scale = 1.0
    h_rot = h_scale * (n_obs ** (-1 / (n_dims + 4)))
    params = {"bandwidth": h_rot * np.logspace(-1, 1, n_bw)}
    grid = GridSearchCV(KernelDensity(), params)
    grid.fit(data)
    kde = grid.best_estimator_
    best_bandwidth = grid.best_params_["bandwidth"]
    resampled_data = kde.sample(n_samples)
    return resampled_data, best_bandwidth

proj_Z(W, Z, p=1, verbose=False)

project W on all interactions of degree p or less of Z

Parameters:

Name Type Description Default
W ndarray

variable(s) (nobs) or (nobs, nw)

required
Z ndarray

instruments (nobs) or(nobs, nz)`; they should not include a constant term

required
p int

maximum total degree for interactions of the columns of Z

1
verbose bool

prints stuff if True

False

Returns:

Type Description
tuple[ndarray, ndarray, float]

the projections of the columns of W on Z etc, the coefficients, and the R^2 of each column

Source code in bs_python_utils/bsstats.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def proj_Z(
    W: np.ndarray, Z: np.ndarray, p: int = 1, verbose: bool = False
) -> tuple[np.ndarray, np.ndarray, float]:
    """project `W` on all interactions of degree `p` or less of `Z`

    Args:
        W: variable(s) `(nobs)` or `(nobs, nw)`
        Z: instruments `(nobs) or `(nobs, nz)`;
            they should **not** include a constant term
        p: maximum total degree for interactions of the columns of `Z`
        verbose: prints stuff if True

    Returns:
        the projections of the columns of `W` on `Z` etc, the coefficients, and the `R^2` of each column
    """
    nobs = Z.shape[0]
    if W.shape[0] != nobs:
        bs_error_abort("W and Z should have the same number of rows")
    if W.ndim > 2:
        bs_error_abort("W should have 1 or 2 dimensions")
    if Z.ndim > 2:
        bs_error_abort("Z should have 1 or 2 dimensions")

    if Z.ndim == 1:
        Zp = np.zeros((nobs, 1 + p))
        Zp[:, 0] = np.ones(nobs)
        for q in range(1, p + 1):
            Zp[:, q] = Z**q
    else:  # Z is a matrix
        Zp, k = _make_Zp(Z, p)
        if verbose:
            print(f"_proj_Z with degree {p}, using {k} regressors")

    return _final_proj(Zp, W)

reg_nonpar(y, X, var_types=None, n_sub=None, n_res=1)

nonparametric regression of y on the columns of X; the bandwidth is chosen on a subsample of size nsub if nsub < nobs, and rescaled.

Parameters:

Name Type Description Default
y ndarray

a vector of size nobs

required
X ndarray

a (nobs) vector or a matrix of shape (nobs, m)

required
var_types str | None

specify types of all X variables if not all of them are continuous; one character per variable

  • 'c' for continuous
  • 'u' discrete unordered
  • 'o' discrete ordered.
None
n_sub int | None

size of subsample for cross-validation; by default it is 200^{(m+4)/5}

None
n_res int | None

how many subsamples we draw; 1 by default

1

Returns:

Type Description
KernelReg

fitted on sample (nobs, with derivatives)

ndarray

and bandwidths (m)

Source code in bs_python_utils/bsstats.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def reg_nonpar(
    y: np.ndarray,
    X: np.ndarray,
    var_types: str | None = None,
    n_sub: int | None = None,
    n_res: int | None = 1,
) -> tuple[KernelReg, np.ndarray]:
    """nonparametric regression of y on the columns of X;
    the bandwidth is chosen on a subsample of size `nsub` if `nsub` < `nobs`, and rescaled.

    Args:
        y: a vector of size nobs
        X: a (nobs) vector or a matrix of shape (nobs, m)
        var_types: specify types of all `X` variables if not all of them are continuous; one character per variable

            * 'c' for continuous
            * 'u' discrete unordered
            * 'o' discrete ordered.
        n_sub: size of subsample for cross-validation;  by default it is `200^{(m+4)/5}`
        n_res: how many subsamples we draw; 1 by default

    Returns:
        fitted on sample (nobs, with derivatives)
        and bandwidths (m)
    """
    _ = check_vector_or_matrix(X)
    n_obs = check_vector(y)
    if X.shape[0] != n_obs:
        bs_error_abort("X and y should have the same number of observations")
    m = 1 if X.ndim == 1 else X.shape[1]
    if var_types is None:
        types = "c" * m
    else:
        if len(var_types) != m:
            bs_error_abort("var_types should have one entry for each column of X")
        types = var_types

    if n_sub is None:
        n_sub = round(200 ** ((m + 4.0) / 5.0))

    k = KernelReg(
        y,
        X,
        var_type=types,
        defaults=EstimatorSettings(
            efficient=True, n_sub=n_sub, randomize=True, n_res=n_res
        ),
    )
    return k.fit(), k.bw

reg_nonpar_fit(y, X, var_types=None, n_sub=None, n_res=1, verbose=False)

nonparametric regression of y on the columns of X; the bandwidth is chosen on a subsample of size nsub if nsub < nobs, and rescaled.

Parameters:

Name Type Description Default
y ndarray

a vector of size nobs

required
X ndarray

a (nobs) vector or a matrix of shape (nobs, m)

required
var_types str | None

specify types of all X variables if not all of them are continuous; one character per variable

  • 'c' for continuous
  • 'u' discrete unordered
  • 'o' discrete ordered.
None
n_sub int | None

size of subsample for cross-validation; by default it is 200^{(m+4)/5}

None
n_res int

how many subsamples we draw; 1 by default

1
verbose bool

prints stuff if True

False

Returns:

Type Description
ndarray

fitted values on sample (nobs)

Source code in bs_python_utils/bsstats.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def reg_nonpar_fit(
    y: np.ndarray,
    X: np.ndarray,
    var_types: str | None = None,
    n_sub: int | None = None,
    n_res: int = 1,
    verbose: bool = False,
) -> np.ndarray:
    """nonparametric regression of y on the columns of X; the bandwidth is chosen on a subsample of size `nsub` if `nsub` < `nobs`, and rescaled.

    Args:
        y: a vector of size nobs
        X: a (nobs) vector or a matrix of shape (nobs, m)
        var_types: specify types of all `X` variables if not all of them are continuous; one character per variable

            * 'c' for continuous
            * 'u' discrete unordered
            * 'o' discrete ordered.
        n_sub: size of subsample for cross-validation; by default it is `200^{(m+4)/5}`
        n_res: how many subsamples we draw; 1 by default
        verbose: prints stuff if True

    Returns:
        fitted values on sample (nobs)
    """
    kfbw = reg_nonpar(y, X, var_types, n_sub, n_res)
    fitted_vals = cast(np.ndarray, kfbw[0][0])
    return fitted_vals

tsls(y, X, Z)

TSLS of y on X with instruments Z

Parameters:

Name Type Description Default
y ndarray

independent variable (nobs)

required
X ndarray

covariates (nobs, nx)

required
Z ndarray

instruments (nobs, nz)

required

Returns:

Type Description
TslsResults

a tsls_results object

Source code in bs_python_utils/bsstats.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def tsls(y: np.ndarray, X: np.ndarray, Z: np.ndarray) -> TslsResults:
    """TSLS of `y` on `X` with instruments `Z`

    Args:
        y: independent variable `(nobs)`
        X: covariates `(nobs, nx)`
        Z: instruments `(nobs, nz)`

    Returns:
        a `tsls_results` object
    """
    # first stage
    X_IV_proj, b_proj_IV, r2_first_iv = proj_Z(X, Z)
    # second stage
    y_proj, y_coeffs, r2_y = proj_Z(y, Z)
    _, iv_estimates, r2_second = proj_Z(y_proj, X_IV_proj)
    return TslsResults(
        iv_estimates,
        r2_first_iv,
        r2_y,
        r2_second,
        y_proj,
        y_coeffs,
        X_IV_proj,
        b_proj_IV,
    )