Skip to content

Agent RBM Module

agent_rbm

AgentRbm

Agent: Implementation of the reduced Bayesian model.

AlAgent

Specifies the properties of the object that implements the reduced Bayesian model.

The model infers the mean of the outcome-generating distribution according to change-point probability and relative uncertainty.

Source code in rbmpy/agent_rbm/AgentRbm.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class AlAgent:
    """Specifies the properties of the object that implements the reduced Bayesian model.

    The model infers the mean of the outcome-generating distribution according to change-point probability and
    relative uncertainty.
    """

    def __init__(self, agent_vars: "AgentVars"):
        """Creates an agent object of class AlAgent based on the agent initialization input.

        Parameters
        ----------
        agent_vars : AgentVars
            Initialization object instance.
        """

        # Set variable task properties based on input
        self.s = agent_vars.s
        self.h = agent_vars.h
        self.u = agent_vars.u
        self.q = agent_vars.q
        self.sigma = agent_vars.sigma
        self.sigma_t_sq = agent_vars.sigma_0
        self.sigma_H = agent_vars.sigma_H
        self.tau_t = agent_vars.tau_0
        self.omega_t = agent_vars.omega_0
        self.mu_t = agent_vars.mu_0
        self.max_x = agent_vars.max_x
        self.circular = agent_vars.circular

        # Initialize variables
        self.a_t = np.nan  # belief update
        self.alpha_t = np.nan  # learning rate
        self.tot_var = np.nan  # total uncertainty
        self.C = np.nan  # term related to catch-trial helicopter cue

    # Futuretodo: Create sub-functions as in sampling agent
    def learn(
        self, delta_t: float, b_t: float, v_t: int, mu_H: float, high_val: int
    ) -> None:
        """Implements the inference of the reduced Bayesian model.

        Parameters
        ----------
        delta_t : float
            Current prediction error.
        b_t : float
            Last prediction of participant.
        v_t : int
            Helicopter visibility.
        mu_H : float
            True helicopter location.
        high_val : int
            High-value index.

        Returns
        -------
        None
            This function does not return any value.

        futuretodo:
        use "mypy" typechecker
        use getters
        """

        if np.isnan(delta_t):
            # Ensure that delta is not NaN so that model is not accidentally applied to wrong data
            sys.exit("delta_t is NaN")

        # Update variance of predictive distribution
        self.tot_var = self.sigma**2 + self.sigma_t_sq

        # Compute change-point probability
        # --------------------------------

        # Likelihood of prediction error given that change point occurred: (1/max_x)^s * h
        term_1 = ((1 / self.max_x) ** self.s) * self.h

        # Likelihood of prediction error given that no change point occurred:
        # (N(delta_t; 0,sigma^2_t + sigma^2))^s * (1-h)
        if self.circular:
            kappa = 1 / self.tot_var
            term_2 = (vonmises.pdf(delta_t, kappa) ** self.s) * (1 - self.h)
        else:
            term_2 = (norm.pdf(delta_t, 0, np.sqrt(self.tot_var)) ** self.s) * (
                1 - self.h
            )

        # Compute change-point probability
        self.omega_t = safe_div(term_1, (term_2 + term_1))

        # Compute learning rate and update belief
        # ---------------------------------------
        self.alpha_t = self.omega_t + self.tau_t - self.tau_t * self.omega_t

        # Add reward bias to learning rate and correct for learning rates > 1 and < 0
        self.alpha_t = self.alpha_t + self.q * high_val
        if self.alpha_t > 1.0:
            self.alpha_t = 1.0
        elif self.alpha_t < 0.0:
            self.alpha_t = 0.0

        # Set model belief equal to last prediction of participant to estimate model using subjective prediction errors
        self.mu_t = b_t

        # hat{a_t} := alpha_t * delta_t
        self.a_t = self.alpha_t * delta_t

        # mu_{t+1} := mu_t + hat{a_t}
        self.mu_t = self.mu_t + self.a_t

        if self.circular:
            # Wrap mu_t around circle
            self.mu_t = self.mu_t % self.max_x

        # On catch trials, take true mean into consideration
        # --------------------------------------------------
        if v_t:

            if self.sigma_H == 0.0:
                # Ensure that sigma_H is not zero
                sys.exit("sigma_H equals 0")

            # Compute weight of true mean
            # w_t := sigma_t^2 / (sigma_t^2 + sigma_H^2)
            w_t = self.sigma_t_sq / (self.sigma_t_sq + self.sigma_H**2)

            # Compute mean of inferred distribution with additional mean information
            # mu_t = (1 - w_t) * mu_{t+1} + w_t * mu_H
            if self.circular:
                self.mu_t = circ_mean(
                    alpha=np.array([self.mu_t, mu_H]), w=np.array([1 - w_t, w_t])
                )
            else:
                self.mu_t = (1 - w_t) * self.mu_t + w_t * mu_H

            # Recompute the model's update under consideration of the catch-trial information
            # \hat{a}_t = mu_{t+1} - b_t, same as in data preprocessing
            if self.circular:
                self.a_t = circ_dist(self.mu_t, b_t)
            else:
                self.a_t = self.mu_t - b_t

            # Compute mixture variance of the two distributions...
            # C := 1 / ((1/sigma_t^2) + (1/sigma_H^2))
            term_1 = safe_div(1, self.sigma_t_sq)
            term_2 = safe_div(1, self.sigma_H**2)
            self.C = safe_div(1, term_1 + term_2)

            # ...and update relative uncertainty accordingly
            # tau_t = C / (C + sigma^2)
            self.tau_t = safe_div(self.C, self.C + self.sigma**2)

            # futuretodo: test model that does not update tau_t
            # after catch trial, but keep in mind subjects don't
            # perfectly trust the cue

        # Update relative uncertainty of the next trial
        # ---------------------------------------------

        # Update estimation uncertainty:
        # sigma_{t+1}^2 := (omega_t * sigma^2
        #                   + (1-omega_t) * tau_t * sigma^2
        #                   + omega_t * (1 - omega_t) * (delta_t * (1 - tau_t))^2) / exp(u)

        # Note that u is already in exponential form
        term_1 = self.omega_t * (self.sigma**2)
        term_2 = (1 - self.omega_t) * self.tau_t * (self.sigma**2)
        term_3 = self.omega_t * (1 - self.omega_t) * ((delta_t * (1 - self.tau_t)) ** 2)
        self.sigma_t_sq = safe_div((term_1 + term_2 + term_3), self.u)

        # Update relative uncertainty:
        # tau_{t+1} := sigma_{t+1}^2 / (sigma_{t+1}^2 + sigma^2)
        self.tau_t = safe_div(self.sigma_t_sq, (self.sigma_t_sq + self.sigma**2))
__init__(agent_vars)

Creates an agent object of class AlAgent based on the agent initialization input.

Parameters:

Name Type Description Default
agent_vars AgentVars

Initialization object instance.

required
Source code in rbmpy/agent_rbm/AgentRbm.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(self, agent_vars: "AgentVars"):
    """Creates an agent object of class AlAgent based on the agent initialization input.

    Parameters
    ----------
    agent_vars : AgentVars
        Initialization object instance.
    """

    # Set variable task properties based on input
    self.s = agent_vars.s
    self.h = agent_vars.h
    self.u = agent_vars.u
    self.q = agent_vars.q
    self.sigma = agent_vars.sigma
    self.sigma_t_sq = agent_vars.sigma_0
    self.sigma_H = agent_vars.sigma_H
    self.tau_t = agent_vars.tau_0
    self.omega_t = agent_vars.omega_0
    self.mu_t = agent_vars.mu_0
    self.max_x = agent_vars.max_x
    self.circular = agent_vars.circular

    # Initialize variables
    self.a_t = np.nan  # belief update
    self.alpha_t = np.nan  # learning rate
    self.tot_var = np.nan  # total uncertainty
    self.C = np.nan  # term related to catch-trial helicopter cue
learn(delta_t, b_t, v_t, mu_H, high_val)

Implements the inference of the reduced Bayesian model.

Parameters:

Name Type Description Default
delta_t float

Current prediction error.

required
b_t float

Last prediction of participant.

required
v_t int

Helicopter visibility.

required
mu_H float

True helicopter location.

required
high_val int

High-value index.

required

Returns:

Name Type Description
None

This function does not return any value.

futuretodo None
use "mypy" typechecker
use getters
Source code in rbmpy/agent_rbm/AgentRbm.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def learn(
    self, delta_t: float, b_t: float, v_t: int, mu_H: float, high_val: int
) -> None:
    """Implements the inference of the reduced Bayesian model.

    Parameters
    ----------
    delta_t : float
        Current prediction error.
    b_t : float
        Last prediction of participant.
    v_t : int
        Helicopter visibility.
    mu_H : float
        True helicopter location.
    high_val : int
        High-value index.

    Returns
    -------
    None
        This function does not return any value.

    futuretodo:
    use "mypy" typechecker
    use getters
    """

    if np.isnan(delta_t):
        # Ensure that delta is not NaN so that model is not accidentally applied to wrong data
        sys.exit("delta_t is NaN")

    # Update variance of predictive distribution
    self.tot_var = self.sigma**2 + self.sigma_t_sq

    # Compute change-point probability
    # --------------------------------

    # Likelihood of prediction error given that change point occurred: (1/max_x)^s * h
    term_1 = ((1 / self.max_x) ** self.s) * self.h

    # Likelihood of prediction error given that no change point occurred:
    # (N(delta_t; 0,sigma^2_t + sigma^2))^s * (1-h)
    if self.circular:
        kappa = 1 / self.tot_var
        term_2 = (vonmises.pdf(delta_t, kappa) ** self.s) * (1 - self.h)
    else:
        term_2 = (norm.pdf(delta_t, 0, np.sqrt(self.tot_var)) ** self.s) * (
            1 - self.h
        )

    # Compute change-point probability
    self.omega_t = safe_div(term_1, (term_2 + term_1))

    # Compute learning rate and update belief
    # ---------------------------------------
    self.alpha_t = self.omega_t + self.tau_t - self.tau_t * self.omega_t

    # Add reward bias to learning rate and correct for learning rates > 1 and < 0
    self.alpha_t = self.alpha_t + self.q * high_val
    if self.alpha_t > 1.0:
        self.alpha_t = 1.0
    elif self.alpha_t < 0.0:
        self.alpha_t = 0.0

    # Set model belief equal to last prediction of participant to estimate model using subjective prediction errors
    self.mu_t = b_t

    # hat{a_t} := alpha_t * delta_t
    self.a_t = self.alpha_t * delta_t

    # mu_{t+1} := mu_t + hat{a_t}
    self.mu_t = self.mu_t + self.a_t

    if self.circular:
        # Wrap mu_t around circle
        self.mu_t = self.mu_t % self.max_x

    # On catch trials, take true mean into consideration
    # --------------------------------------------------
    if v_t:

        if self.sigma_H == 0.0:
            # Ensure that sigma_H is not zero
            sys.exit("sigma_H equals 0")

        # Compute weight of true mean
        # w_t := sigma_t^2 / (sigma_t^2 + sigma_H^2)
        w_t = self.sigma_t_sq / (self.sigma_t_sq + self.sigma_H**2)

        # Compute mean of inferred distribution with additional mean information
        # mu_t = (1 - w_t) * mu_{t+1} + w_t * mu_H
        if self.circular:
            self.mu_t = circ_mean(
                alpha=np.array([self.mu_t, mu_H]), w=np.array([1 - w_t, w_t])
            )
        else:
            self.mu_t = (1 - w_t) * self.mu_t + w_t * mu_H

        # Recompute the model's update under consideration of the catch-trial information
        # \hat{a}_t = mu_{t+1} - b_t, same as in data preprocessing
        if self.circular:
            self.a_t = circ_dist(self.mu_t, b_t)
        else:
            self.a_t = self.mu_t - b_t

        # Compute mixture variance of the two distributions...
        # C := 1 / ((1/sigma_t^2) + (1/sigma_H^2))
        term_1 = safe_div(1, self.sigma_t_sq)
        term_2 = safe_div(1, self.sigma_H**2)
        self.C = safe_div(1, term_1 + term_2)

        # ...and update relative uncertainty accordingly
        # tau_t = C / (C + sigma^2)
        self.tau_t = safe_div(self.C, self.C + self.sigma**2)

        # futuretodo: test model that does not update tau_t
        # after catch trial, but keep in mind subjects don't
        # perfectly trust the cue

    # Update relative uncertainty of the next trial
    # ---------------------------------------------

    # Update estimation uncertainty:
    # sigma_{t+1}^2 := (omega_t * sigma^2
    #                   + (1-omega_t) * tau_t * sigma^2
    #                   + omega_t * (1 - omega_t) * (delta_t * (1 - tau_t))^2) / exp(u)

    # Note that u is already in exponential form
    term_1 = self.omega_t * (self.sigma**2)
    term_2 = (1 - self.omega_t) * self.tau_t * (self.sigma**2)
    term_3 = self.omega_t * (1 - self.omega_t) * ((delta_t * (1 - self.tau_t)) ** 2)
    self.sigma_t_sq = safe_div((term_1 + term_2 + term_3), self.u)

    # Update relative uncertainty:
    # tau_{t+1} := sigma_{t+1}^2 / (sigma_{t+1}^2 + sigma^2)
    self.tau_t = safe_div(self.sigma_t_sq, (self.sigma_t_sq + self.sigma**2))

AgentVarsRbm

AgentVars: Initialization of the reduced Bayesian model

AgentVars

Specifies the AgentVars object for the reduced optimal model.

futuretodo: Consider using a data class here

Source code in rbmpy/agent_rbm/AgentVarsRbm.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class AgentVars:
    """Specifies the AgentVars object for the reduced optimal model.

    futuretodo: Consider using a data class here
    """

    def __init__(self):
        # Determines the default agent variables.

        self.s = 1  # surprise sensitivity
        self.h = 0.1  # hazard rate
        self.u = 0.0  # uncertainty underestimation
        self.q = 0  # reward bias
        self.sigma = 10  # noise in the environment (standard deviation)
        self.sigma_0 = 100  # initial variance of predictive distribution
        self.sigma_H = 1  # catch-trial standard deviation of hidden-mean cue
        self.tau_0 = 0.5  # initial relative uncertainty
        self.omega_0 = 1  # initial change-point probability
        self.mu_0 = 150  # initial belief about mean
        self.max_x = 300  # maximum outcome
        self.circular = False  # circular vs. linear outcome space