Skip to content

bsstats module

Contains some statistical routines.

TslsResults dataclass

contains the full results of a TSLS regression

Source code in bs_python_utils/bsstats.py
23
24
25
26
27
28
29
30
31
32
33
34
@dataclass
class TslsResults:
    """contains the full results of a TSLS regression"""

    iv_estimates: float | np.ndarray | None
    r2_first_iv: float | np.ndarray | None
    r2_y: float | None
    r2_second: float | np.ndarray | None
    y_proj: float | np.ndarray | None
    y_coeffs: float | np.ndarray | None
    X_IV_proj: float | np.ndarray | None
    b_proj_IV: float | np.ndarray | None

bs_multivariate_normal_pdf(values_x, means_x, cov_mat)

Multivariate (or univariate) normal probability density function at values_x

Parameters:

Name Type Description Default
values_x ndarray

values at which to evaluate the pdf, an n-vector or an (n, nvars) matrix

required
means_x float | ndarray

means of the multivariate normal, a float or an (nvars) vector

required
cov_mat float | ndarray

covariance matrix of the multivariate normal, a float or an (nvars, nvars) matrix

required

Returns:

Type Description
ndarray

the values of the density at values_x

Source code in bs_python_utils/bsstats.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def bs_multivariate_normal_pdf(
    values_x: np.ndarray, means_x: float | np.ndarray, cov_mat: float | np.ndarray
) -> np.ndarray:
    """Multivariate (or univariate) normal probability density function at values_x

    Args:
        values_x: values at which to evaluate the pdf, an `n`-vector or an `(n, nvars)` matrix
        means_x: means of the multivariate normal, a float or an `(nvars)` vector
        cov_mat: covariance matrix of the multivariate normal, a float or an `(nvars, nvars)` matrix

    Returns:
        the values of the density at `values_x`
    """
    ndims_values = check_vector_or_matrix(values_x, "bs_multivariate_normal_pdf")
    if ndims_values == 1:  # we are evaluating a univariate normal
        # if not type(means_x) == float:
        #     bs_error_abort(f"means_x should be a float as values_x is a vector")
        # if not type(cov_mat) == float:
        #     bs_error_abort(f"cov_mat should be a float as values_x is a vector")
        sigma2 = cov_mat
        resid = values_x - means_x
        dval = np.exp(-0.5 * resid * resid / sigma2) / np.sqrt(2 * np.pi * sigma2)
        return cast(np.ndarray, dval)
    else:  # we are evaluating a multivariate normal
        n, nvars = values_x.shape
        n_means = check_vector(means_x, "bs_multivariate_normal_pdf")
        if n_means != nvars:
            bs_error_abort(f"means_x should be a vector of size {nvars} not {n_means}")
        nrows, ncols = check_matrix(cov_mat, "bs_multivariate_normal_pdf")
        if nrows != ncols or nrows != nvars:
            bs_error_abort(
                f"cov_mat should be a matrix ({nvars}, {nvars}) not ({nrows}, {ncols})"
            )
        resid = values_x - means_x
        argresid = spla.solve(cov_mat, resid.T)
        argexp = np.zeros(n)
        for i in range(n):
            argexp[i] = np.dot(resid[i, :], argresid[:, i])
        dval = np.exp(-0.5 * argexp) / np.sqrt(
            ((2 * np.pi) ** nvars) * spla.det(cov_mat)
        )
        return cast(np.ndarray, dval)

emcee_draw(n_samples, log_pdf, p0, params=None, n_burn_in=100, seed=8754)

uses MCMC to draw n_samples samples from the log pdf with given parameters

Parameters:

Name Type Description Default
n_samples int

the number of samples to draw

required
log_pdf Callable[[ndarray, list], float]

the log of the pdf, log_pdf(x, *params); retuns a float from the array for one sample

required
p0 ndarray

the initial values for the walkers

required
params list | None

a list of parameters

None
n_burn_in int | None

the number of burn-in iterations for MCMC

100
seed int | None

to randomly draw the samples from the walkers

8754

Returns:

Type Description
ndarray

an (n_samples, n_dims) array of samples.

Warning

The log_pdf function should return minus infinity outside of the support of the pdf, and p0 should be contained in that support.

Source code in bs_python_utils/bsstats.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
def emcee_draw(
    n_samples: int,
    log_pdf: Callable[[np.ndarray, list], float],
    p0: np.ndarray,
    params: list | None = None,
    n_burn_in: int | None = 100,
    seed: int | None = 8754,
) -> np.ndarray:
    """uses MCMC to draw `n_samples` samples from the log pdf with given parameters

    Args:
        n_samples: the number of samples to draw
        log_pdf: the log of the pdf, `log_pdf(x, *params)`;
            retuns a float from the array for one sample
        p0: the initial values for the walkers
        params: a list of parameters
        n_burn_in: the number of burn-in iterations for MCMC
        seed: to randomly draw the samples from the walkers

    Returns:
        an `(n_samples, n_dims)` array of samples.

    Warning:
        The `log_pdf` function should return minus infinity
        outside of the support of the pdf, and `p0` should be contained
        in that support.
    """
    n_walkers, n_dims = p0.shape
    # burn in
    sampler = EnsembleSampler(n_walkers, n_dims, log_pdf, args=params)
    state = sampler.run_mcmc(p0, n_burn_in)
    sampler.reset()
    # generate the samples
    sampler.run_mcmc(state, n_samples)
    samples = sampler.get_chain(flat=True)
    rng = np.random.default_rng(seed)
    samples = rng.choice(samples, size=n_samples, replace=False)
    return cast(np.ndarray, samples)

estimate_pdf(x_obs, x_points, MIN_SIZE_NONPAR=200, weights=None)

return an estimate of the conditional densities of x at points values_x (Silverman rule)

Parameters:

Name Type Description Default
x_obs ndarray

an n-vector or an (n, nvars) matrix of the observed values of x

required
x_points ndarray

an m-vector or an (m, nvars) matrix of x values

required
MIN_SIZE_NONPAR int

minimum size above which we use kernel density estimators

200
weights ndarray | None

an n-vector of weights for the observations, if present

None

Returns:

Type Description
ndarray

the density estimates at values_x

Source code in bs_python_utils/bsstats.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def estimate_pdf(
    x_obs: np.ndarray,
    x_points: np.ndarray,
    MIN_SIZE_NONPAR: int = 200,
    weights: np.ndarray | None = None,
) -> np.ndarray:
    """return an estimate of the conditional densities of `x` at points `values_x` (Silverman rule)

    Args:
        x_obs: an `n`-vector or an `(n, nvars)` matrix of the observed values of `x`
        x_points: an `m`-vector or an `(m, nvars)` matrix of x values
        MIN_SIZE_NONPAR: minimum size above which we use kernel density estimators
        weights: an `n`-vector of weights for the observations, if present

    Returns:
        the density estimates at `values_x`
    """
    ndims_x = check_vector_or_matrix(x_obs, "estimate_pdf")
    ndims_valx = check_vector_or_matrix(x_points, "estimate_pdf")

    if ndims_x == 1:
        n_obs = x_obs.size
        if ndims_valx != 1:
            bs_error_abort(f"x_points should have one dimension, not {ndims_valx}")
        xt_obs = x_obs.reshape((-1, 1))
        nvars = 1
        xt_points = x_points.reshape((-1, 1))
    else:  # ndims_x == 2
        n_obs, nvars = x_obs.shape
        if ndims_valx == 1:  # only one x point with  nv elements
            nv = x_points.size
        else:  # several x points with  nv elements
            nv = x_points.shape[1]
        if nv != nvars:
            bs_error_abort(f"x_points should have {nvars} variables, not {nv}")
        xt_obs = x_obs
        xt_points = x_points

    if weights is not None:
        n_w = check_vector(weights, "estimate_pdf")
        if n_w != n_obs:
            bs_error_abort(
                f"if weights is given, it should be a vector of size {n_obs} not {n_w}"
            )

    min_size_np = MIN_SIZE_NONPAR ** ((4.0 + nvars) / 5.0)

    if n_obs > min_size_np:  # cell large enough to use nonparametrics
        # fit joint density of x
        kde = sts.gaussian_kde(xt_obs.T, bw_method="silverman", weights=weights)
        # density of x at values_x
        f_x = kde.evaluate(xt_points.T)
    else:
        # sample too small, we fit a normal
        if ndims_x == 1:  # univariate
            mean_x = np.mean(x_obs)
            var_x = np.var(x_obs)
            f_x = bs_multivariate_normal_pdf(x_points, mean_x, var_x)
        else:  # multivariate
            means_x = np.mean(x_obs, 0)
            cov_mat = np.cov(x_obs.T)
            f_x = bs_multivariate_normal_pdf(x_points, means_x, cov_mat)
        if weights is not None:
            f_x *= weights / np.mean(weights)
    return cast(np.ndarray, f_x)

flexible_reg(Y, X, mode='NP', var_types=None, n_sub=None, n_res=1, verbose=False)

flexible regression of Y on X

Parameters:

Name Type Description Default
Y ndarray

independent variable (nobs) or (nobs, ny)

required
X ndarray

covariates (nobs) or (nobs, nx); should not include a constant term

required
mode str

what flexible means * 'NP': non parametric * 'SPL': spline regression, only on one covariate * '1': linear * '2': quadratic

'NP'
var_types str | None

[for 'NP' only] specify types of all X variables if not all of them are continuous; one character per variable * 'c' for continuous * 'u' discrete unordered * 'o' discrete ordered

None
n_sub int | None

[for 'NP' only] size of subsample for cross-validation; by default it is 200^{(m+4)/5}

None
n_res int

[for 'NP' only] how many subsamples we draw; 1 if None

1
verbose bool

prints stuff if True

False

Returns:

Type Description
ndarray

E(y|X) at the sample points

Source code in bs_python_utils/bsstats.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def flexible_reg(
    Y: np.ndarray,
    X: np.ndarray,
    mode: str = "NP",
    var_types: str | None = None,
    n_sub: int | None = None,
    n_res: int = 1,
    verbose: bool = False,
) -> np.ndarray:
    """flexible regression  of `Y` on `X`

    Args:
        Y: independent variable `(nobs)` or `(nobs, ny)`
        X: covariates `(nobs)` or `(nobs, nx)`; should **not** include a constant term
        mode: what flexible means
            * 'NP': non parametric
            * 'SPL': spline regression, only on one covariate
            * '1': linear
            * '2': quadratic
        var_types: [for 'NP' only]  specify types of all `X` variables if not all of them are continuous; 
            one character per variable
            * 'c' for continuous
            * 'u' discrete unordered
            * 'o' discrete ordered
        n_sub: [for 'NP' only] size of subsample for cross-validation; \
            by default it is `200^{(m+4)/5}`
        n_res: [for 'NP' only] how many subsamples we draw; 1 if `None`
        verbose: prints stuff if True

    Returns: 
        `E(y|X)` at the sample points
    """
    if mode == "NP":
        if Y.ndim == 2:
            ny = Y.shape[1]
            Y_fit = np.zeros_like(Y)
            for iy in range(ny):
                Y_fit[:, iy] = reg_nonpar_fit(
                    Y[:, iy],
                    X,
                    var_types=var_types,
                    n_sub=n_sub,
                    n_res=n_res,
                    verbose=verbose,
                )
            return Y_fit
        else:
            return reg_nonpar_fit(
                Y, X, var_types=var_types, n_sub=n_sub, n_res=n_res, verbose=verbose
            )
    elif mode == "SPL":
        if X.ndim > 1:
            bs_error_abort("with a spline, only works in one dimension")
        return spline_reg(Y, X)
    else:
        try:
            imode = int(mode)
        except TypeError:
            bs_error_abort(f"does not accept mode={mode}")
        preg, _, _ = proj_Z(Y, X, p=imode, verbose=verbose)
        return preg

kde_resample(data, n_samples, n_bw=10)

Fit a nonparametric density to data by KDE, with cross-validation with starting point at rule of thumb, and generate samples from the estimated density.

Parameters:

Name Type Description Default
data ndarray

an (n_obs, n_dims) matrix of data

required
n_samples int

how many iid draws we want

required
n_bw int

how mamy bandwidths we try from 1/10th to 10 times the rule-of-thumb

10

Returns:

Type Description
tuple[ndarray, float]

an (n_samples, n_dims) matrix of draws, and the bandwidth used.

Source code in bs_python_utils/bsstats.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
def kde_resample(
    data: np.ndarray, n_samples: int, n_bw: int = 10
) -> tuple[np.ndarray, float]:
    """Fit a nonparametric density to data by KDE, with cross-validation with starting point at rule of thumb,
    and generate samples from the estimated density.

    Args:
        data: an `(n_obs, n_dims)` matrix of data
        n_samples: how many iid draws we want
        n_bw: how mamy bandwidths we try from 1/10th to 10 times the rule-of-thumb

    Returns:
        an `(n_samples, n_dims)` matrix of draws, and the bandwidth used.
    """
    n_obs, n_dims = check_matrix(data)
    h_rot = np.std(data) * (n_obs ** (-1 / (n_dims + 4)))
    params = {"bandwidth": h_rot * np.logspace(-1, 1, n_bw)}
    grid = GridSearchCV(KernelDensity(), params)
    grid.fit(data)
    kde = grid.best_estimator_
    best_bandwidth = grid.best_params_["bandwidth"]
    resampled_data = kde.sample(n_samples)
    return resampled_data, best_bandwidth

proj_Z(W, Z, p=1, verbose=False)

project W on all interactions of degree p or less of Z

Parameters:

Name Type Description Default
W ndarray

variable(s) (nobs) or (nobs, nw)

required
Z ndarray

instruments (nobs) or(nobs, nz)`; they should not include a constant term

required
p int

maximum total degree for interactions of the columns of Z

1
verbose bool

prints stuff if True

False

Returns:

Type Description
tuple[ndarray, ndarray, float]

the projections of the columns of W on Z etc, the coefficients, and the R^2 of each column

Source code in bs_python_utils/bsstats.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def proj_Z(
    W: np.ndarray, Z: np.ndarray, p: int = 1, verbose: bool = False
) -> tuple[np.ndarray, np.ndarray, float]:
    """project `W` on all interactions of degree `p` or less of `Z`

    Args:
        W: variable(s) `(nobs)` or `(nobs, nw)`
        Z: instruments `(nobs) or `(nobs, nz)`;
            they should **not** include a constant term
        p: maximum total degree for interactions of the columns of `Z`
        verbose: prints stuff if True

    Returns:
        the projections of the columns of `W` on `Z` etc, the coefficients, and the `R^2` of each column
    """
    nobs = Z.shape[0]
    if W.shape[0] != nobs:
        bs_error_abort("W and Z should have the same number of rows")
    if W.ndim > 2:
        bs_error_abort("W should have 1 or 2 dimensions")
    if Z.ndim > 2:
        bs_error_abort("Z should have 1 or 2 dimensions")

    if Z.ndim == 1:
        Zp = np.zeros((nobs, 1 + p))
        Zp[:, 0] = np.ones(nobs)
        for q in range(1, p + 1):
            Zp[:, q] = Z**q
    else:  # Z is a matrix
        Zp, k = _make_Zp(Z, p)
        if verbose:
            print(f"_proj_Z with degree {p}, using {k} regressors")

    return _final_proj(Zp, W)

reg_nonpar(y, X, var_types=None, n_sub=None, n_res=1)

nonparametric regression of y on the columns of X; the bandwidth is chosen on a subsample of size nsub if nsub < nobs, and rescaled.

Parameters:

Name Type Description Default
y ndarray

a vector of size nobs

required
X ndarray

a (nobs) vector or a matrix of shape (nobs, m)

required
var_types str | None

specify types of all X variables if not all of them are continuous; one character per variable

  • 'c' for continuous
  • 'u' discrete unordered
  • 'o' discrete ordered.
None
n_sub int | None

size of subsample for cross-validation; by default it is 200^{(m+4)/5}

None
n_res int | None

how many subsamples we draw; 1 by default

1

Returns:

Type Description
KernelReg

fitted on sample (nobs, with derivatives)

ndarray

and bandwidths (m)

Source code in bs_python_utils/bsstats.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def reg_nonpar(
    y: np.ndarray,
    X: np.ndarray,
    var_types: str | None = None,
    n_sub: int | None = None,
    n_res: int | None = 1,
) -> tuple[KernelReg, np.ndarray]:
    """nonparametric regression of y on the columns of X;
    the bandwidth is chosen on a subsample of size `nsub` if `nsub` < `nobs`, and rescaled.

    Args:
        y: a vector of size nobs
        X: a (nobs) vector or a matrix of shape (nobs, m)
        var_types: specify types of all `X` variables if not all of them are continuous; one character per variable

            * 'c' for continuous
            * 'u' discrete unordered
            * 'o' discrete ordered.
        n_sub: size of subsample for cross-validation;  by default it is `200^{(m+4)/5}`
        n_res: how many subsamples we draw; 1 by default

    Returns:
        fitted on sample (nobs, with derivatives)
        and bandwidths (m)
    """
    _ = check_vector_or_matrix(X)
    n_obs = check_vector(y)
    if X.shape[0] != n_obs:
        bs_error_abort("X and y should have the same number of observations")
    m = 1 if X.ndim == 1 else X.shape[1]
    if var_types is None:
        types = "c" * m
    else:
        if len(var_types) != m:
            bs_error_abort("var_types should have one entry for each column of X")
        types = var_types

    if n_sub is None:
        n_sub = round(200 ** ((m + 4.0) / 5.0))

    k = KernelReg(
        y,
        X,
        var_type=types,
        defaults=EstimatorSettings(
            efficient=True, n_sub=n_sub, randomize=True, n_res=n_res
        ),
    )
    return k.fit(), k.bw

reg_nonpar_fit(y, X, var_types=None, n_sub=None, n_res=1, verbose=False)

nonparametric regression of y on the columns of X; the bandwidth is chosen on a subsample of size nsub if nsub < nobs, and rescaled.

Parameters:

Name Type Description Default
y ndarray

a vector of size nobs

required
X ndarray

a (nobs) vector or a matrix of shape (nobs, m)

required
var_types str | None

specify types of all X variables if not all of them are continuous; one character per variable

  • 'c' for continuous
  • 'u' discrete unordered
  • 'o' discrete ordered.
None
n_sub int | None

size of subsample for cross-validation; by default it is 200^{(m+4)/5}

None
n_res int

how many subsamples we draw; 1 by default

1
verbose bool

prints stuff if True

False

Returns:

Type Description
ndarray

fitted values on sample (nobs)

Source code in bs_python_utils/bsstats.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def reg_nonpar_fit(
    y: np.ndarray,
    X: np.ndarray,
    var_types: str | None = None,
    n_sub: int | None = None,
    n_res: int = 1,
    verbose: bool = False,
) -> np.ndarray:
    """nonparametric regression of y on the columns of X; the bandwidth is chosen on a subsample of size `nsub` if `nsub` < `nobs`, and rescaled.

    Args:
        y: a vector of size nobs
        X: a (nobs) vector or a matrix of shape (nobs, m)
        var_types: specify types of all `X` variables if not all of them are continuous; one character per variable

            * 'c' for continuous
            * 'u' discrete unordered
            * 'o' discrete ordered.
        n_sub: size of subsample for cross-validation; by default it is `200^{(m+4)/5}`
        n_res: how many subsamples we draw; 1 by default
        verbose: prints stuff if True

    Returns:
        fitted values on sample (nobs)
    """
    kfbw = reg_nonpar(y, X, var_types, n_sub, n_res)
    fitted_vals = cast(np.ndarray, kfbw[0][0])
    return fitted_vals

tsls(y, X, Z)

TSLS of y on X with instruments Z

Parameters:

Name Type Description Default
y ndarray

independent variable (nobs)

required
X ndarray

covariates (nobs, nx)

required
Z ndarray

instruments (nobs, nz)

required

Returns:

Type Description
TslsResults

a tsls_results object

Source code in bs_python_utils/bsstats.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def tsls(y: np.ndarray, X: np.ndarray, Z: np.ndarray) -> TslsResults:
    """TSLS of `y` on `X` with instruments `Z`

    Args:
        y: independent variable `(nobs)`
        X: covariates `(nobs, nx)`
        Z: instruments `(nobs, nz)`

    Returns:
        a `tsls_results` object
    """
    # first stage
    X_IV_proj, b_proj_IV, r2_first_iv = proj_Z(X, Z)
    # second stage
    y_proj, y_coeffs, r2_y = proj_Z(y, Z)
    _, iv_estimates, r2_second = proj_Z(y_proj, X_IV_proj)
    return TslsResults(
        iv_estimates,
        r2_first_iv,
        r2_y,
        r2_second,
        y_proj,
        y_coeffs,
        X_IV_proj,
        b_proj_IV,
    )