当前位置 : 主页 > 编程语言 > python >

baselines算法库baselines/bench/monitor.py模块分析

来源:互联网 收集:自由互联 发布时间:2022-06-15
baselines算法库baselines/bench/monitor.py模块代码: __all__ = ['Monitor', 'get_monitor_files', 'load_results'] from gym.core import Wrapper import time from glob import glob import csv import os.path as osp import json class Monitor


baselines算法库baselines/bench/monitor.py模块代码:

__all__ = ['Monitor', 'get_monitor_files', 'load_results']

from gym.core import Wrapper
import time
from glob import glob
import csv
import os.path as osp
import json

class Monitor(Wrapper):
EXT = "monitor.csv"
f = None

def __init__(self, env, filename, allow_early_resets=False, reset_keywords=(), info_keywords=()):
Wrapper.__init__(self, env=env)
self.tstart = time.time()
if filename:
self.results_writer = ResultsWriter(filename,
header={"t_start": time.time(), 'env_id' : env.spec and env.spec.id},
extra_keys=reset_keywords + info_keywords
)
else:
self.results_writer = None
self.reset_keywords = reset_keywords
self.info_keywords = info_keywords
self.allow_early_resets = allow_early_resets
self.rewards = None
self.needs_reset = True
self.episode_rewards = []
self.episode_lengths = []
self.episode_times = []
self.total_steps = 0
self.current_reset_info = {} # extra info about the current episode, that was passed in during reset()

def reset(self, **kwargs):
self.reset_state()
for k in self.reset_keywords:
v = kwargs.get(k)
if v is None:
raise ValueError('Expected you to pass kwarg %s into reset'%k)
self.current_reset_info[k] = v
return self.env.reset(**kwargs)

def reset_state(self):
if not self.allow_early_resets and not self.needs_reset:
raise RuntimeError("Tried to reset an environment before done. If you want to allow early resets, wrap your env with Monitor(env, path, allow_early_resets=True)")
self.rewards = []
self.needs_reset = False


def step(self, action):
if self.needs_reset:
raise RuntimeError("Tried to step environment that needs reset")
ob, rew, done, info = self.env.step(action)
self.update(ob, rew, done, info)
return (ob, rew, done, info)

def update(self, ob, rew, done, info):
self.rewards.append(rew)
if done:
self.needs_reset = True
eprew = sum(self.rewards)
eplen = len(self.rewards)
epinfo = {"r": round(eprew, 6), "l": eplen, "t": round(time.time() - self.tstart, 6)}
for k in self.info_keywords:
epinfo[k] = info[k]
self.episode_rewards.append(eprew)
self.episode_lengths.append(eplen)
self.episode_times.append(time.time() - self.tstart)
epinfo.update(self.current_reset_info)
if self.results_writer:
self.results_writer.write_row(epinfo)
assert isinstance(info, dict)
if isinstance(info, dict):
info['episode'] = epinfo

self.total_steps += 1

def close(self):
super(Monitor, self).close()
if self.f is not None:
self.f.close()

def get_total_steps(self):
return self.total_steps

def get_episode_rewards(self):
return self.episode_rewards

def get_episode_lengths(self):
return self.episode_lengths

def get_episode_times(self):
return self.episode_times

class LoadMonitorResultsError(Exception):
pass


class ResultsWriter(object):
def __init__(self, filename, header='', extra_keys=()):
self.extra_keys = extra_keys
assert filename is not None
if not filename.endswith(Monitor.EXT):
if osp.isdir(filename):
filename = osp.join(filename, Monitor.EXT)
else:
filename = filename + "." + Monitor.EXT
self.f = open(filename, "wt")
if isinstance(header, dict):
header = '# {} \n'.format(json.dumps(header))
self.f.write(header)
self.logger = csv.DictWriter(self.f, fieldnames=('r', 'l', 't')+tuple(extra_keys))
self.logger.writeheader()
self.f.flush()

def write_row(self, epinfo):
if self.logger:
self.logger.writerow(epinfo)
self.f.flush()


def get_monitor_files(dir):
return glob(osp.join(dir, "*" + Monitor.EXT))

def load_results(dir):
import pandas
monitor_files = (
glob(osp.join(dir, "*monitor.json")) +
glob(osp.join(dir, "*monitor.csv"))) # get both csv and (old) json files
if not monitor_files:
raise LoadMonitorResultsError("no monitor files of the form *%s found in %s" % (Monitor.EXT, dir))
dfs = []
headers = []
for fname in monitor_files:
with open(fname, 'rt') as fh:
if fname.endswith('csv'):
firstline = fh.readline()
if not firstline:
continue
assert firstline[0] == '#'
header = json.loads(firstline[1:])
df = pandas.read_csv(fh, index_col=None)
headers.append(header)
elif fname.endswith('json'): # Deprecated json format
episodes = []
lines = fh.readlines()
header = json.loads(lines[0])
headers.append(header)
for line in lines[1:]:
episode = json.loads(line)
episodes.append(episode)
df = pandas.DataFrame(episodes)
else:
assert 0, 'unreachable'
df['t'] += header['t_start']
dfs.append(df)
df = pandas.concat(dfs)
df.sort_values('t', inplace=True)
df.reset_index(inplace=True)
df['t'] -= min(header['t_start'] for header in headers)
df.headers = headers # HACK to preserve backwards compatibility
return df

View Code

baselines是开源的reinforcement leanring算法库,作为最早开源最权威的reinforcement learning算法库由于多年没有进行维护了已经没有太新的算法被加入到这个算法库了,不过这个算法库之后的其他算法库虽然有一直被维护但是很多都是自行其是,很难有太过于经过检测 的代码库了,因此个人认为baselines算法库依旧有其意义价值,于是在疫情宅家期间对该算法库做了一定的源码解析,不过在解析的过程中不得不说发现这个开源算法库由于是多人合作完成的虽然被社区接受和认可但是各个模块的编写风格十分迥异,而且很多模块存在造轮子的嫌疑,甚至很多模块代码优化程度不高、逻辑混乱,对此自己也只能说是不得不吐槽一下。

===============================================

该模块的核心代码为:

class Monitor(Wrapper):

该模块的功能就是将gym的env进行 包装,记录下每一个episode的长度,以及每步的step的奖励reward,以及该episode的用时时长,然后将这些信息写到文本中。

由于代码量较大,实现国内较简单,意义价值不大,而且代码较为混乱就不细解析了。

=================================================


上一篇:baselines算法库logger.py模块分析
下一篇:没有了
网友评论