AIを使って不労所得の道へ(12)~Callbackで定期的にモデル保存+評価するようにしてみた~

AIを使って不労所得

はじめに

素人でも戦略的に取引したい

株やFXの知識がないため,戦略的な取引ができない素人が多いと思います.私もその一人です.
そこで,強化学習の力を借りて戦略的に取引をしようと思います.

AIncomeプロジェクト

AIで所得(Income)を得るため「AIncome」と名付けました.

前回までの取り組み

自動取引強化学習の環境構築偏

前回は強化学習とそれをシミュレーションするOpenAI Gymの説明をしました*1.さらには,ランダムで行動を決定して売り買いしてみたところ,ランダムでも利益が出るような結果になっていました*1.また,ランダム投資からPPO2へ深層学習強化学習のモデル投入*2やTensorboardで可視化*3,手数料の考慮*4などを取り組んでいました.

学習高速化偏

さらに,学習回数を10倍に増やしたりしてみましたところ,処理時間が大幅に増えてきました*5ので,並列化の知識を付け*6,FXのデータ*7やビットコインのデータ*8に適用してみました.

長期学習の環境構築偏

ここで,いい感じに順調な結果がでてきておりそろそろ長期的に学習を回そうと思った矢先!
追加学習ができない←(前回クリア*9),定期的にモデルを保存できない,テンソルボードがおかしいなどの問題に直面しました,
そのため,これらの問題をクリアにして生きたいと思います.

今回の取り組みの概要

定期的にモデルを保存+評価できる環境をサンプルのLunarLanderを使って作っていきます,

必要なもの

Google colab の環境

file

公式ドキュメント

Examples — Stable Baselines 2.10.2 documentation

Calllback

学習中に定期的に呼ばれることで,モデルの保存やモデルの評価を行います.

CheckpointCallback

これは,定期的にモデルを保存するコールバックです.
デフォルトのCheckpointCallbackで問題なく動いたので特に修正しません.

EvalCallback

定期的にモデルの評価を行いnpzで出力します.
デフォルトでは毎回同じファイル名で出力してしまっているため,過去の評価結果との比較ができませんでした.
そこで,毎回ファイル名にstep数を入れることで解決しました.

            print(">>> log_path >>>")
            self.log_path2 = self.log_path + "-{}".format(self.num_timesteps)

下記,EvalCallbackの全体コードです.

class EvalCallback2(EventCallback):
    """
    Callback for evaluating an agent.

    :param eval_env: (Union[gym.Env, VecEnv]) The environment used for initialization
    :param callback_on_new_best: (Optional[BaseCallback]) Callback to trigger
        when there is a new best model according to the `mean_reward`
    :param n_eval_episodes: (int) The number of episodes to test the agent
    :param eval_freq: (int) Evaluate the agent every eval_freq call of the callback.
    :param log_path: (str) Path to a folder where the evaluations (`evaluations.npz`)
        will be saved. It will be updated at each evaluation.
    :param best_model_save_path: (str) Path to a folder where the best model
        according to performance on the eval env will be saved.
    :param deterministic: (bool) Whether the evaluation should
        use a stochastic or deterministic actions.
    :param render: (bool) Whether to render or not the environment during evaluation
    :param verbose: (int)
    """
    def __init__(self, eval_env: Union[gym.Env, VecEnv],
                 callback_on_new_best: Optional[BaseCallback] = None,
                 n_eval_episodes: int = 5,
                 eval_freq: int = 10000,
                 log_path: str = None,
                 best_model_save_path: str = None,
                 deterministic: bool = True,
                 render: bool = False,
                 verbose: int = 1,
                 name_prefix:str = None):

        super(EvalCallback2, self).__init__(callback_on_new_best, verbose=verbose)
        self.n_eval_episodes = n_eval_episodes
        self.eval_freq = eval_freq
        self.best_mean_reward = -np.inf
        self.last_mean_reward = -np.inf
        self.deterministic = deterministic
        self.render = render
        self.name_prefix = name_prefix

        # Convert to VecEnv for consistency
        if not isinstance(eval_env, VecEnv):
            eval_env = DummyVecEnv([lambda: eval_env])

        assert eval_env.num_envs == 1, "You must pass only one environment for evaluation"

        self.eval_env = eval_env
        self.best_model_save_path = best_model_save_path
        # Logs will be written in `evaluations.npz`
        if log_path is not None:
            log_path = os.path.join(log_path, 'evaluations')
        self.log_path = log_path
        self.evaluations_results = []
        self.evaluations_timesteps = []
        self.evaluations_length = []

    def _init_callback(self):
        # Does not work in some corner cases, where the wrapper is not the same
        if not type(self.training_env) is type(self.eval_env):
            warnings.warn("Training and eval env are not of the same type"
                          "{} != {}".format(self.training_env, self.eval_env))

        # Create folders if needed
        if self.best_model_save_path is not None:
            os.makedirs(self.best_model_save_path, exist_ok=True)
        if self.log_path is not None:
            os.makedirs(os.path.dirname(self.log_path), exist_ok=True)

    def _on_step(self) -> bool:

        if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0:
            # Sync training and eval env if there is VecNormalize
            sync_envs_normalization(self.training_env, self.eval_env)

            episode_rewards, episode_lengths = evaluate_policy(self.model, self.eval_env,
                                                               n_eval_episodes=self.n_eval_episodes,
                                                               render=self.render,
                                                               deterministic=self.deterministic,
                                                               return_episode_rewards=True)

            print(">>> log_path >>>")
            self.log_path2 = self.log_path + "-{}".format(self.num_timesteps)
            print(self.log_path)
            print(self.log_path2)

            if self.log_path2 is not None:
                self.evaluations_timesteps.append(self.num_timesteps)
                self.evaluations_results.append(episode_rewards)
                self.evaluations_length.append(episode_lengths)
                np.savez(self.log_path2, timesteps=self.evaluations_timesteps,
                         results=self.evaluations_results, ep_lengths=self.evaluations_length)

            print(">>> episode_rewards >>>")
            print(episode_rewards)

            mean_reward, std_reward = np.mean(episode_rewards), np.std(episode_rewards)
            mean_ep_length, std_ep_length = np.mean(episode_lengths), np.std(episode_lengths)
            # Keep track of the last evaluation, useful for classes that derive from this callback
            self.last_mean_reward = mean_reward

            if self.verbose > 0:
                print("Eval num_timesteps={}, "
                      "episode_reward={:.2f} +/- {:.2f}".format(self.num_timesteps, mean_reward, std_reward))
                print("Episode length: {:.2f} +/- {:.2f}".format(mean_ep_length, std_ep_length))

            if mean_reward > self.best_mean_reward:
                if self.verbose > 0:
                    print("New best mean reward!")
                if self.best_model_save_path is not None:
                    self.model.save(os.path.join(self.best_model_save_path, 'best_model'))
                self.best_mean_reward = mean_reward
                # Trigger callback if needed
                if self.callback is not None:
                    return self._on_event()

        return True

ProgressBarCallback

学習する際に,残りの時間が不透明だったので作りました.
しかし,毎回改行されており処理時間に負荷があるのでは?ということから作成しましたが使っておりません.一応記載しておきます.

class ProgressBarCallback(BaseCallback):
    """
    A custom callback that derives from ``BaseCallback``.

    :param verbose: (int) Verbosity level 0: not output 1: info 2: debug
    """
    def __init__(self, total_timesteps=100000, verbose=0):
        super(ProgressBarCallback, self).__init__(verbose)
        self.total_timesteps = total_timesteps
        self.pbar = tqdm(total=self.total_timesteps)

    def _on_step(self) -> bool:

        self.pbar.n = self.n_calls
        self.pbar.update(0)

    def _on_training_end(self) -> None:
        self.pbar.n = self.total_timesteps
        self.pbar.update(0)
        self.pbar.close()

Training, Saving, Loading + Callback Code

前回の記事にコールバック部分を追加したコードになります.

パッケージのインストール

OpenGym関係

!pip install "gym==0.19.0"
#!pip install stable-baselines[mpi]
!apt install swig cmake libopenmpi-dev zlib1g-dev
!pip install stable-baselines[mpi]==2.10.2 box2d box2d-kengz

tensorflow

# Stable Baselines only supports tensorflow 1.x for now
!pip uninstall -y tensorflow-gpu
!pip uninstall -y tensorflow
!pip install tensorflow-gpu==1.14.0

tensorboard

!pip uninstall tensorboard-plugin-wit --yes

Import policy, RL agent, ...

import gym
import numpy as np
from tqdm import tqdm

import glob
import os

import warnings
import typing
from typing import Union, List, Dict, Any, Optional

from stable_baselines import DQN
from stable_baselines import PPO2
from stable_baselines import ACKTR
from stable_baselines import A2C

from stable_baselines.common.vec_env import VecEnv, sync_envs_normalization, DummyVecEnv
from stable_baselines.common.evaluation import evaluate_policy

import warnings
warnings.filterwarnings('ignore')

import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'

# callback 関係
from stable_baselines.common.callbacks import CallbackList, CheckpointCallback, EvalCallback, EventCallback
from stable_baselines.common.callbacks import BaseCallback

Create the Gym env and instantiate the agent

For this example, we will use Lunar Lander environment.

この例では、Lunar Lander環境を使用します。

"Landing outside landing pad is possible. Fuel is infinite, so an agent can learn to fly and then land on its first attempt. Four discrete actions available: do nothing, fire left orientation engine, fire main engine, fire right orientation engine. "

着陸パッドの外に着陸することも可能です。燃料は無限であるため、エージェントは飛行を学習し、最初の試みで着陸することができます。何もしない」「左旋回エンジン起動」「メインエンジン起動」「右旋回エンジン起動」の4つの個別アクションが可能です。

Lunar Lander environment: https://gym.openai.com/envs/LunarLander-v2/

Lunar Lander

Note: vectorized environments allow to easily multiprocess training. In this example, we are using only one process, hence the DummyVecEnv.

注意:ベクトル化された環境では、簡単にマルチプロセスで学習することができます。この例では、1つのプロセスしか使っていないので、DummyVecEnv.

We chose the MlpPolicy because input of CartPole is a feature vector, not images.

CartPoleの入力は画像ではなく、特徴ベクトルなので、MlpPolicyを選択しました。

The type of action to use (discrete/continuous) will be automatically deduced from the environment action space

使用するアクションの種類(離散/連続)は、環境のアクション空間から自動的に推測されます。

Initial setting

Create dir

# ログフォルダの生成
log_dir = './logs/'
datasets_dir = '../datasets/'

os.makedirs(log_dir, exist_ok=True)
os.makedirs(datasets_dir, exist_ok=True)

Train setting

total_timesteps = 100000

callback setting

save_freq = 10000
eval_freq = 10000

Callback

CheckpointCallback

デフォルトのCheckpointCallbackで問題なく動いたので特に修正しません.

EvalCallback

class EvalCallback2(EventCallback):
    """
    Callback for evaluating an agent.

    :param eval_env: (Union[gym.Env, VecEnv]) The environment used for initialization
    :param callback_on_new_best: (Optional[BaseCallback]) Callback to trigger
        when there is a new best model according to the `mean_reward`
    :param n_eval_episodes: (int) The number of episodes to test the agent
    :param eval_freq: (int) Evaluate the agent every eval_freq call of the callback.
    :param log_path: (str) Path to a folder where the evaluations (`evaluations.npz`)
        will be saved. It will be updated at each evaluation.
    :param best_model_save_path: (str) Path to a folder where the best model
        according to performance on the eval env will be saved.
    :param deterministic: (bool) Whether the evaluation should
        use a stochastic or deterministic actions.
    :param render: (bool) Whether to render or not the environment during evaluation
    :param verbose: (int)
    """
    def __init__(self, eval_env: Union[gym.Env, VecEnv],
                 callback_on_new_best: Optional[BaseCallback] = None,
                 n_eval_episodes: int = 5,
                 eval_freq: int = 10000,
                 log_path: str = None,
                 best_model_save_path: str = None,
                 deterministic: bool = True,
                 render: bool = False,
                 verbose: int = 1,
                 name_prefix:str = None):

        super(EvalCallback2, self).__init__(callback_on_new_best, verbose=verbose)
        self.n_eval_episodes = n_eval_episodes
        self.eval_freq = eval_freq
        self.best_mean_reward = -np.inf
        self.last_mean_reward = -np.inf
        self.deterministic = deterministic
        self.render = render
        self.name_prefix = name_prefix

        # Convert to VecEnv for consistency
        if not isinstance(eval_env, VecEnv):
            eval_env = DummyVecEnv([lambda: eval_env])

        assert eval_env.num_envs == 1, "You must pass only one environment for evaluation"

        self.eval_env = eval_env
        self.best_model_save_path = best_model_save_path
        # Logs will be written in `evaluations.npz`
        if log_path is not None:
            log_path = os.path.join(log_path, 'evaluations')
        self.log_path = log_path
        self.evaluations_results = []
        self.evaluations_timesteps = []
        self.evaluations_length = []

    def _init_callback(self):
        # Does not work in some corner cases, where the wrapper is not the same
        if not type(self.training_env) is type(self.eval_env):
            warnings.warn("Training and eval env are not of the same type"
                          "{} != {}".format(self.training_env, self.eval_env))

        # Create folders if needed
        if self.best_model_save_path is not None:
            os.makedirs(self.best_model_save_path, exist_ok=True)
        if self.log_path is not None:
            os.makedirs(os.path.dirname(self.log_path), exist_ok=True)

    def _on_step(self) -> bool:

        if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0:
            # Sync training and eval env if there is VecNormalize
            sync_envs_normalization(self.training_env, self.eval_env)

            episode_rewards, episode_lengths = evaluate_policy(self.model, self.eval_env,
                                                               n_eval_episodes=self.n_eval_episodes,
                                                               render=self.render,
                                                               deterministic=self.deterministic,
                                                               return_episode_rewards=True)

            print(">>> log_path >>>")
            self.log_path2 = self.log_path + "-{}".format(self.num_timesteps)
            print(self.log_path)
            print(self.log_path2)

            if self.log_path2 is not None:
                self.evaluations_timesteps.append(self.num_timesteps)
                self.evaluations_results.append(episode_rewards)
                self.evaluations_length.append(episode_lengths)
                np.savez(self.log_path2, timesteps=self.evaluations_timesteps,
                         results=self.evaluations_results, ep_lengths=self.evaluations_length)

            print(">>> episode_rewards >>>")
            print(episode_rewards)

            mean_reward, std_reward = np.mean(episode_rewards), np.std(episode_rewards)
            mean_ep_length, std_ep_length = np.mean(episode_lengths), np.std(episode_lengths)
            # Keep track of the last evaluation, useful for classes that derive from this callback
            self.last_mean_reward = mean_reward

            if self.verbose > 0:
                print("Eval num_timesteps={}, "
                      "episode_reward={:.2f} +/- {:.2f}".format(self.num_timesteps, mean_reward, std_reward))
                print("Episode length: {:.2f} +/- {:.2f}".format(mean_ep_length, std_ep_length))

            if mean_reward > self.best_mean_reward:
                if self.verbose > 0:
                    print("New best mean reward!")
                if self.best_model_save_path is not None:
                    self.model.save(os.path.join(self.best_model_save_path, 'best_model'))
                self.best_mean_reward = mean_reward
                # Trigger callback if needed
                if self.callback is not None:
                    return self._on_event()

        return True

ProgressBarCallback

class ProgressBarCallback(BaseCallback):
    """
    A custom callback that derives from ``BaseCallback``.

    :param verbose: (int) Verbosity level 0: not output 1: info 2: debug
    """
    def __init__(self, total_timesteps=100000, verbose=0):
        super(ProgressBarCallback, self).__init__(verbose)
        self.total_timesteps = total_timesteps
        self.pbar = tqdm(total=self.total_timesteps)

    def _on_step(self) -> bool:

        self.pbar.n = self.n_calls
        self.pbar.update(0)

    def _on_training_end(self) -> None:
        self.pbar.n = self.total_timesteps
        self.pbar.update(0)
        self.pbar.close()

Create model

学習環境の生成を行います.
仮想通貨の取引ではPP02ですが,引数や諸々違っており変更するのが面倒なためDQNでやります.

env = gym.make('LunarLander-v2')
model = DQN('MlpPolicy', env, learning_rate=1e-3, prioritized_replay=True, verbose=0, tensorboard_log=log_dir, full_tensorboard_log=True)
#policy_kwargs = dict(net_arch=[64, 'lstm', dict(vf=[128, 128, 128], pi=[64, 64])])
#model = PPO2('MlpLstmPolicy', env, verbose=0, policy_kwargs=policy_kwargs, nminibatches=1, tensorboard_log=log_dir)

Define evaluate func

評価する関数を定義します.何回も実施して平均を取るような中身になっています.

def evaluate(model, num_steps=1000):
  """
  Evaluate a RL agent
  :param model: (BaseRLModel object) the RL Agent
  :param num_steps: (int) number of timesteps to evaluate it
  :return: (float) Mean reward for the last 100 episodes
  """
  episode_rewards = [0.0]
  obs = env.reset()
  for i in tqdm(range(num_steps)):
      # _states are only useful when using LSTM policies
      action, _states = model.predict(obs)

      obs, reward, done, info = env.step(action)

      # Stats
      episode_rewards[-1] += reward
      if done:
          obs = env.reset()
          episode_rewards.append(0.0)
  # Compute mean reward for the last 100 episodes
  mean_100ep_reward = round(np.mean(episode_rewards[-100:]), 1)
  print("Mean reward:", mean_100ep_reward, "Num episodes:", len(episode_rewards))

  return mean_100ep_reward

取りあえず,学習していない状態で評価してみます.

# Random Agent, before training
mean_reward_before_train = evaluate(model, num_steps=10000)
    100%|██████████| 10000/10000 [00:13<00:00, 728.47it/s]

    Mean reward: -587.2 Num episodes: 144

Save & Load & Learn

ここから,実際にコールバックを組み込んでいき,モデルを定期的に保存,評価する機構を作っていきます.

Loop train

ここから学習のループが始まります.


# ====================================================
# Init 
#
resume_idx = 0
mean_reward_list = []

# ====================================================
# Create Env & Model
#
env = gym.make('LunarLander-v2')
model = DQN('MlpPolicy', env, learning_rate=1e-3, prioritized_replay=True, verbose=0, tensorboard_log=log_dir, full_tensorboard_log=True)

for i in tqdm(range(resume_idx,  5)):

  # ====================================================
  # callback
  #
  # -------
  # eval callback
  #
  eval_callback = EvalCallback2(env, best_model_save_path='./logs/best_model',
                              log_path='./logs/results', eval_freq=eval_freq, verbose=2, name_prefix='rl_model_{:05d}_'.format(i))
  # -------
  # checkpoint callback
  #
  checkpoint_callback = CheckpointCallback(save_freq=save_freq, save_path='./logs/', name_prefix='rl_model_{:05d}'.format(i), verbose=2)

  # -------
  # merge callback
  #
  #callback = CallbackList([checkpoint_callback, eval_callback, ProgressBarCallback(total_timesteps)])
  callback = CallbackList([checkpoint_callback, eval_callback, ])

  if(i>0):
    # ====================================================
    # Model setting
    #
    model = DQN.load("dqn_lunar_{:07d}".format(i-1))
    model.set_env(env)

  #tb_log_name = "sampleE{}v".format(i)
  tb_log_name = "sampleG"
  print("tb_log_name : {}".format(tb_log_name))

  model.learn(total_timesteps=total_timesteps , log_interval=1, tb_log_name=tb_log_name, reset_num_timesteps=False, callback=callback)
  mean_reward = evaluate(model, num_steps=10000)
  print("------------")
  print(mean_reward)
  mean_reward_list.append(mean_reward)

  # ====================================================
  # Save model
  #
  model.save("dqn_lunar_{:07d}".format(i))

Load evaluation

Get file list

評価結果が入っているnpzファイルのパス一覧を取得します.

eva_files = glob.glob("./logs/results/*.npz")
eva_files
    ['./logs/results/evaluations-460000.npz',
     './logs/results/evaluations-180000.npz',
     './logs/results/evaluations-480000.npz',
     './logs/results/evaluations-250000.npz',
     './logs/results/evaluations-170000.npz',
     './logs/results/evaluations-210000.npz',
     './logs/results/evaluations-190000.npz',
     './logs/results/evaluations-420000.npz',
     './logs/results/evaluations-430000.npz',
     './logs/results/evaluations-370000.npz',
     './logs/results/evaluations-50000.npz',
     './logs/results/evaluations-450000.npz',
     './logs/results/evaluations-20000.npz',
     './logs/results/evaluations-490000.npz',
     './logs/results/evaluations-280000.npz',
     './logs/results/evaluations-130000.npz',
     './logs/results/evaluations-400000.npz',
     './logs/results/evaluations-150000.npz',
     './logs/results/evaluations-240000.npz',
     './logs/results/evaluations-60000.npz',
     './logs/results/evaluations-390000.npz',
     './logs/results/evaluations-440000.npz',
     './logs/results/evaluations-120000.npz',
     './logs/results/evaluations-340000.npz',
     './logs/results/evaluations-220000.npz',
     './logs/results/evaluations-300000.npz',
     './logs/results/evaluations-100000.npz',
     './logs/results/evaluations-380000.npz',
     './logs/results/evaluations-350000.npz',
     './logs/results/evaluations-290000.npz',
     './logs/results/evaluations-200000.npz',
     './logs/results/evaluations-110000.npz',
     './logs/results/evaluations-140000.npz',
     './logs/results/evaluations-270000.npz',
     './logs/results/evaluations-80000.npz',
     './logs/results/evaluations-310000.npz',
     './logs/results/evaluations-230000.npz',
     './logs/results/evaluations-320000.npz',
     './logs/results/evaluations-40000.npz',
     './logs/results/evaluations-470000.npz',
     './logs/results/evaluations-500000.npz',
     './logs/results/evaluations-410000.npz',
     './logs/results/evaluations-360000.npz',
     './logs/results/evaluations-70000.npz',
     './logs/results/evaluations-330000.npz',
     './logs/results/evaluations-90000.npz',
     './logs/results/evaluations-260000.npz',
     './logs/results/evaluations-30000.npz',
     './logs/results/evaluations-160000.npz',
     './logs/results/evaluations-10000.npz']

Load npz file

npzファイルを順に読み込んでいきます.

step_list = []
reward_list = []

for file_path in eva_files:
  result = np.load(file_path)
  print("--------------------------")
  step_name = file_path.split("-")[-1].split(".")[0]
  print(step_name)
  print(list(result))
  print(np.mean(result['results']))

  step_list.append(int(step_name))
  reward_list.append(np.mean(result['results']))

Adjust data

DataFrameに整形します.

import pandas as pd

df_eval = pd.DataFrame(data={"reward":reward_list, "step":step_list})
df_eval = df_eval.sort_values('step')
df_eval.head(5)
reward step
49 -86.500504 10000
12 20.095142 20000
47 12.921559 30000
38 -41.613075 40000
10 -80.080345 50000
df_eval.tail(5)

Plot evaluation

評価結果をプロットします.

import seaborn as sns
import matplotlib.pyplot as plt
plt.figure(figsize=(6*2,4))
sns.lineplot(data=df_eval, x='step', y='reward')
    <matplotlib.axes._subplots.AxesSubplot at 0x7f0aeada89d0>

Tensorboard

# Load the TensorBoard notebook extension
%load_ext tensorboard
#!kill 6654
%tensorboard --logdir logs/

GoogleColabのファイル

AIncome/AutoTrade12_callback.ipynb at main · HamaruKi0303/AIncome
Contribute to HamaruKi0303/AIncome development by creating an account on GitHub.

おわりに

無事に定期的にモデルの保存+評価をすることができました.

次回はようやくテンソルボード関係のバグを修正していきます.

参考サイト

コメント

タイトルとURLをコピーしました