Artificial Intelligence : Reinforcement Learning- Deep Convolutional Q-Learning (DCQN) (Part 37)

Check out the Deep Q Learning blog before starting learning

In Deep Q Learning , we did pass our states to a ANN

but that does not happen all the time. We use our visions as well.

To solve problems which we can see, we need to use a neural network which deals with image and that's CNN

This is the CNN part we will add our problems to solve the issue.

So, this is how it will look once you give it to solve a shooting game

Eligibility Trace (N step Q learning)

Now, assume that we have an agent(robot) and two states. One state where it's currently in and one state where it will check and go.

Now, we will check the reward and depending on that, we will go to the second sate(box)

This reward can be positive or negative (penalty)

In this way, it will go to all of the cells while checking the rewards

But , there is another way it can follow, rather than going to 1 state each time, it can check multiple states and their rewards, and then decide which state within those are better (eligible to move to the state) or bad ( eligible to avoid)

From this image, you can guess , this agent checks these 4 states and their rewards and then,

it has the idea, which reward was good and which was bad. It will then take decision which state to move or not.

Let's code this down

We are going to solve the Pacman problem

The code is 90% same as DQN blog we have done earlier. So, make sure to open this code and understand the codes line by line.

There are various variants of this game and we need good computational power to solve this one.

But we want to make it easier for you to play along and thus will choose this one

Although it says, deterministic, it's not fully deterministic.The monsters will have intelligently stochastic actions, but part of it will be deterministic

Installing Gymnasium

!pip install gymnasium

!pip install "gymnasium[atari, accept-rom-license]"

!apt-get install -y swig

!pip install gymnasium[box2d]

Importing the libraries

import os

import random

import numpy as np #numpy

import torch #pytorch

import torch.nn as nn #importing neural networks

import torch.optim as optim #importing optimizer

import torch.nn.functional as F #to use functions

from collections import deque

from torch.utils.data import DataLoader, TensorDataset

Creating the architecture of the Neural Network

class Network(nn.Module):

def init(self,action_size, seed =42): #We no more have state_size

super(Network, self).__init__() #just to activate inheritence

self.seed = torch.manual_seed(seed) # just to generate some random vectors

Now , we need to add CNN layers

self.conv1=nn.Conv2d(in_channels=3, out_channels=32, kernel_size=8, stride=4)

As using rgb, we use input channel 3, we want 32 output channels,good kernel size is 8*8, good stride value is 4

Note: All of these values are given based on our experiments

Bash normalization operation

self.bn1=nn.BatchNorm2d(32) #we had 32 channels of feature maps (output channel)

Now, we will have three more series of a convolution followed by a batch normalization operations

#Note; input and output channels are going to increase but the kernel size and stride is going to decrease (as we are flattening the matrix)

#second layer

self.conv2=nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2)# as connected to 1st layer, the input for this will be 32; also got 64 channels to be good for output here

self.bn2=nn.BatchNorm2d(64)

#3rd layer

self.conv3=nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)

# 64 inputs from conv2 but 64 output as well because after experimenting, it's still gives good result

self.bn3=nn.BatchNorm2d(64)

#4th layer

self.conv4=nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1)

#we wanna gradually increase the depths, meaning the number of channels over the convolutions. And so since we didn't do it for the previous one, well we're gonna do it here and we're gonna add 128 output channel

self.bn4=nn.BatchNorm2d(128)

#first full connection

self.fc1 = nn.Linear(10*10*128,512)

#input feature will be the number of output features resulting from flattening all the previous convolutions.

Input size minus the kernel size plus two times the padding, all that divided by the stride, and then plus one. So you would need to apply this formula here first to get the output size of the first convolutional layer. Then to apply this formula another time here to get the output size of the second convolutional layer.Then another time here to get the output size of the third convolutional layer. And then finally another time here to get the output size of the fourth convolutional layer. Which is 10*10*128 After some experimentation and hyper parameter tuning is actually 512 artificial neurons or output features in this first fully connected layer resulting from this first full connection.

self.fc2 = nn.Linear(512,256) #here the input is the output of fc1= 512, and a good number of output neuron should be 256 after seen from various experiments #final fully connected layer

self.fc3 = nn.Linear(256, action_size) #input is the output of fc2, and output will be the action size

Now, the forward method

def forward(self, state):

#forward propagate from image to first convolutional layer

x= F.relu(self.bn1(self.conv1(state)))#self.conv1(state) will forward the image to convolutional layer and then we pass this whole to batch normalization layer by self.bn1(); let's activate this using relu F.relu()

#forward from first convolutation layer to second

x= F.relu(self.bn2(self.conv2(x)))

#forward from second convolution layer to third

x= F.relu(self.bn3(self.conv3(x)))

#forward from third convolution layer to fourth

x= F.relu(self.bn4(self.conv4(x)))

#we just need to do a little reshape in order to reshape the tensor in order to flatten it

x=x.view(x.size(0),-1) #first dimension corresponding to the batch remains the same and the other dimensions are flattened

#let's take our signal x, then let's forward propagate this signal x from the final flattening layer, to the first fully connected layer,

x= F.relu(self.fc1(x)) # forward propagate to first fully connected layer fc1 using self.fc1(x) then activate it using F.relu

x= F.relu(self.fc2(x)) # forward propagate to second

x= self.fc3(x) # forward propagate to third

return x

Training the AI

Setting up the environment

import gymnasium as gym #importing gymnasium

env = gym.make('MsPacmanDeterministic-v0',full_action_space=False)

#creating the environment, full_action_space = false basically ensure that the agent uses a simplified set of actions for Miss Pacman

state_shape= env.observation_space.shape #responds to the rgb channels here

state_size= env.observation_space.shape[0] #the number of elements in this input state.

number_actions = env.action_space.n #number of actions

print('State Shape', state_shape) #State Shape (210, 160, 3) rgb channels

print('State size:', state_size)

print('Number of actions', number_actions) #action state should be 5, but here it will show 9 because Miss Pacman deterministic actually contains more actions

Initializing the hyperparameters

learning_rate= 5e-4

minibatch_size = 64

discount_factor = 0.99

#No need of soft update here and no need of replay buffer here

Preprocessing the frames

#we have to pre-process the frames so that the input images can be converted into PyTorch tensors that are accepted by the neural network of our AI

from PIL import Image

from torchvision import transforms #transforms module import

def preprocess_frame(frame):

#frames coming from the pacman game will be converted to pytorch tensors #we have now one frame from the game which is in the format of numpy array. So, convert this to pil image object

frame=Image.fromarray(frame) #pil image object

#Preprocessing object

preprocess =transforms.Compose([transforms.Resize((128,128)),transforms.ToTensor()])

#compose class takes list as an input and have resized dimension. We have shape (210, 160, 3) which is hard to process and we will resize this to 128 by 128 (128,128) using transforms.Resize(), then transforms.ToTensor() will convert them to pytorch tensors

return preprocess(frame).unsqueeze(0)

#return the preprocessed frame # our frames always need to be in their corresponding batch. And in order to keep track of which batch each frame belongs to,we're gonna use the unsqueeze method, which will just take as input one argument, which is the index of that dimension of the batch. using [0]so that the dimension of the batch will be the first dimension.

Implementing the DCQN class

class Agent(): #creating our agent

def init(self,action_size): #no state_size here

self.device=torch.device("cuda" if torch.cuda.is_available() else "cpu")

self.action_size=action_size

self.local_qnetwork=Network(action_size).to(self.device) #new

self.target_qnetwork=Network(action_size).to(self.device) #new

self.optimizer=optim.Adam( self.local_qnetwork.parameters(),lr=learning_rate)

self.memory=deque(maxlen=1000)#new

#no timesteps

Step method: And this is a method that will store experiences and decide when to learn from them

def step(self,state,action,reward,next_state,done):

Here, we have removed the codes for push method and timesteps

#REPLACEMENT OF Push method

state=preprocess_frame(state).to(self.device) #preprocess state

next_state=preprocess_frame(next_state).to(self.device) #next preprocess state

self.memory.append((state,action,reward,next_state,done)) #adding to memory
if len(self.memory)> minibatch_size: #new

experiences= random.sample(self.memory,k=minibatch_size) #new self.learn(experiences,discount_factor)

def act(self,state,epsilon=0.):

state= preprocess_frame(state).to(self.device) #new ; the state is now is an image rather than an input vector which we had in DQN

self.local_qnetwork.eval()

with torch.no_grad():

action_values= self.local_qnetwork(state)

self.local_qnetwork.train()

if random.random() > epsilon:

return np.argmax(action_values.cpu().data.numpy())

else:

return random.choice(np.arange(self.action_size))

def learn(self,experiences,discount_factor):

states,actions, rewards,next_states,dones = zip(*experiences) #new

We can deal with states either by using vstacks( which can take numpy arrays or even torch) or, we can use torch.cat

#vstacks(option 1) states=torch.from_numpy(np.vstack(states)).float().to(self.device) #new actions=torch.from_numpy(np.vstack(actions)).long().to(self.device) #new rewards=torch.from_numpy(np.vstack(rewards)).float().to(self.device) #new next_states=torch.from_numpy(np.vstack(next_states)).float().to(self.device) #new dones=torch.from_numpy(np.vstack(dones).astype(np.uint8)).float().to(self.device) #new

#--> torch.cat (option 2)

#states = torch.cat(states) #states and next_states are already pytorch tensors here

#next_states = torch.cat(next_states)

next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)

q_targets = rewards + (discount_factor next_q_targets(1-dones))

q_expected= self.local_qnetwork(states).gather(1,actions)

loss = F.mse_loss(q_expected,q_targets)

self.optimizer.zero_grad()

loss.backward()

self.optimizer.step() #no soft update

Initializing the DCQN agent

agent = Agent(number_actions)

Training the DCQN agent

number_episodes= 2000

max_number_timesteps_per_episode = 10000 #new; increased the eiposodes for better training

epsilon_starting_value=1.0

epsilon_ending_value= 0.01

epsilon_decay_value = 0.995

epsilon= epsilon_starting_value

scores_on_100_episodes= deque(maxlen=100)

#main

for episode in range(1,number_episodes+1):

state, = env.reset()

score=0

for t in range(maxnumber_timesteps_per_episode):

action= agent.act(state,epsilon)

next_state,reward,done,_,_= env.step(action) agent.step(state,action,reward,next_state,done)

state=next_state

score+=reward

if done:

break

scores_on_100_episodes.append(score)

epsilon= max(epsilon_ending_value,epsilon_decay_value*epsilon) print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode,np.mean(scores_on_100_episodes)), end="") print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)))

if np.mean(scores_on_100_episodes) >= 500.0:

#new: if the average scores_on_100_episodes is larger than 500, well time to say we win

print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode-100, np.mean(scores_on_100_episodes))) torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')

break

Check out the code to run and visualize

So, we have got our desired average 500 points at episode 733. This training took about 2 hours

You can then visualize the game

Done!!

Read more