How to use a custom metric with Tensorflow Agents

How to use a custom metric with Tensorflow Agents

In this article, I am going to implement a custom Tensorflow Agents metric that calculates the maximal discounted reward.

First, I have to import the metric-related modules and the driver module (the driver runs the simulation). Additionally, I need an environment. I’m going to use the one I implemented in this article.

1
2
3
4
from tf_agents.metrics import tf_py_metric
from tf_agents.metrics import py_metric
from tf_agents.drivers import py_driver
from tf_agents.drivers import dynamic_episode_driver

My metric needs to store the rewards and discounts from the current episode and the maximal discounted total score. For that, I need two arrays (for the episode scores) and one variable to keep the maximal reward.

1
2
3
4
5
6
7
class MaxEpisodeScoreMetric(py_metric.PyStepMetric):
  def __init__(self, name='MaxEpisodeScoreMetric'):
    super(py_metric.PyStepMetric, self).__init__(name)
    self.rewards = []
    self.discounts = []
    self.max_discounted_reward = None
    self.reset()

The reset function is mandatory, and it allows the metric instance to be reused by separate driver runs.

1
2
3
4
5
#add it inside the MaxEpisodeScoreMetric class
  def reset(self):
    self.rewards = []
    self.discounts = []
    self.max_discounted_reward = None

In the call function, I am going to copy the reward and discount of the current step to the arrays. Then, if the current step is also the last step of an episode, I am going to calculate the discounted reward using the Bellman equation.

After that, I compare the total discounted reward of the current episode with the maximal reward. If I got a value larger than the current maximum, I would replace the maximum with the new value.

Are you interested in data engineering?

Check out my other blog https://easydata.engineering

Because the instance is not reset between episodes, I need to clear the lists I use to keep the episode rewards and discounts.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#add it inside the MaxEpisodeScoreMetric class
def call(self, trajectory):
    self.rewards += trajectory.reward
    self.discounts += trajectory.discount
    
    if(trajectory.is_last()):      
      adjusted_discounts = [1.0] + self.discounts # because a step has its value + the discount of the NEXT step (Bellman equation)
      adjusted_discounts = adjusted_discounts[:-1] # dropping the discount of the last step because it is not followed by a next step, so the value is useless
      discounted_reward = np.sum(np.multiply(self.rewards, adjusted_discounts))
      print(self.rewards, adjusted_discounts, discounted_reward)
      
      if self.max_discounted_reward == None:
        self.max_discounted_reward = discounted_reward
      
      if discounted_reward > self.max_discounted_reward:
        self.max_discounted_reward = discounted_reward
        
      self.rewards = []
      self.discounts = []

In the result function, I don’t need to perform any additional operations, so I return the maximal discounted total reward.

1
2
3
#add it inside the MaxEpisodeScoreMetric class
  def result(self):
    return self.max_discounted_reward

I want to use my metric as a Tensorflow metric, so I had to wrap it with a class extending TFPyMetric.

1
2
3
4
5
6
7
class TFMaxEpisodeScoreMetric(tf_py_metric.TFPyMetric):

  def __init__(self, name='MaxEpisodeScoreMetric', dtype=tf.float32):
    py_metric = MaxEpisodeScoreMetric()

    super(TFMaxEpisodeScoreMetric, self).__init__(
        py_metric=py_metric, name=name, dtype=dtype)

Finally, I can add the metric to the driver’s observers and run the driver.

1
2
3
4
5
6
7
8
9
10
11
12
#tf_env is from the article mentioned in the second paragraph
tf_policy = random_tf_policy.RandomTFPolicy(action_spec=tf_env.action_spec(),
                                            time_step_spec=tf_env.time_step_spec())

max_score = TFMaxEpisodeScoreMetric()

observers = [max_score]
driver = dynamic_episode_driver.DynamicEpisodeDriver(tf_env, tf_policy, observers, num_episodes=1000)

final_time_step, policy_state = driver.run()

print('Max score:', max_score.result().numpy())

Result:

1
Max score: 1.715

Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you watch programming live streams, check out my YouTube channel.
You can also follow me on Twitter: @mikulskibartosz

If you want to hire me, send me a message on LinkedIn or Twitter.


Bartosz Mikulski
Bartosz Mikulski * data scientist / software/data engineer * conference speaker * organizer of School of A.I. meetups in Poznań * co-founder of Software Craftsmanship Poznan & Poznan Scala User Group