pneuma-pygame/Godot/addons/godot_rl_agents/sync.gd

580 lines
19 KiB
GDScript3
Raw Normal View History

2024-05-16 23:16:20 +00:00
extends Node
# --fixed-fps 2000 --disable-render-loop
enum ControlModes { HUMAN, TRAINING, ONNX_INFERENCE }
@export var control_mode: ControlModes = ControlModes.TRAINING
@export_range(1, 10, 1, "or_greater") var action_repeat := 8
@export_range(0, 10, 0.1, "or_greater") var speed_up := 1.0
@export var onnx_model_path := ""
# Onnx model stored for each requested path
var onnx_models: Dictionary
@onready var start_time = Time.get_ticks_msec()
const MAJOR_VERSION := "0"
const MINOR_VERSION := "7"
const DEFAULT_PORT := "11008"
const DEFAULT_SEED := "1"
var stream: StreamPeerTCP = null
var connected = false
var message_center
var should_connect = true
var all_agents: Array
var agents_training: Array
## Policy name of each agent, for use with multi-policy multi-agent RL cases
var agents_training_policy_names: Array[String] = ["shared_policy"]
var agents_inference: Array
var agents_heuristic: Array
## For recording expert demos
var agent_demo_record: Node
## File path for writing recorded trajectories
var expert_demo_save_path: String
## Stores recorded trajectories
var demo_trajectories: Array
## A trajectory includes obs: Array, acts: Array, terminal (set in Python env instead)
var current_demo_trajectory: Array
var need_to_send_obs = false
var args = null
var initialized = false
var just_reset = false
var onnx_model = null
var n_action_steps = 0
var _action_space_training: Array[Dictionary] = []
var _action_space_inference: Array[Dictionary] = []
var _obs_space_training: Array[Dictionary] = []
# Called when the node enters the scene tree for the first time.
func _ready():
await get_tree().root.ready
get_tree().set_pause(true)
_initialize()
await get_tree().create_timer(1.0).timeout
get_tree().set_pause(false)
func _initialize():
_get_agents()
args = _get_args()
Engine.physics_ticks_per_second = _get_speedup() * 60 # Replace with function body.
Engine.time_scale = _get_speedup() * 1.0
prints(
"physics ticks",
Engine.physics_ticks_per_second,
Engine.time_scale,
_get_speedup(),
speed_up
)
_set_heuristic("human", all_agents)
_initialize_training_agents()
_initialize_inference_agents()
_initialize_demo_recording()
_set_seed()
_set_action_repeat()
initialized = true
func _initialize_training_agents():
if agents_training.size() > 0:
_obs_space_training.resize(agents_training.size())
_action_space_training.resize(agents_training.size())
for agent_idx in range(0, agents_training.size()):
_obs_space_training[agent_idx] = agents_training[agent_idx].get_obs_space()
_action_space_training[agent_idx] = agents_training[agent_idx].get_action_space()
connected = connect_to_server()
if connected:
_set_heuristic("model", agents_training)
_handshake()
_send_env_info()
else:
push_warning(
"Couldn't connect to Python server, using human controls instead. ",
"Did you start the training server using e.g. `gdrl` from the console?"
)
func _initialize_inference_agents():
if agents_inference.size() > 0:
if control_mode == ControlModes.ONNX_INFERENCE:
assert(
FileAccess.file_exists(onnx_model_path),
"Onnx Model Path set on Sync node does not exist: %s" % onnx_model_path
)
onnx_models[onnx_model_path] = ONNXModel.new(onnx_model_path, 1)
for agent in agents_inference:
var action_space = agent.get_action_space()
_action_space_inference.append(action_space)
var agent_onnx_model: ONNXModel
if agent.onnx_model_path.is_empty():
assert(
onnx_models.has(onnx_model_path),
(
"Node %s has no onnx model path set " % agent.get_path()
+ "and sync node's control mode is not set to OnnxInference. "
+ "Either add the path to the AIController, "
+ "or if you want to use the path set on sync node instead, "
+ "set control mode to OnnxInference."
)
)
prints(
"Info: AIController %s" % agent.get_path(),
"has no onnx model path set.",
"Using path set on the sync node instead."
)
agent_onnx_model = onnx_models[onnx_model_path]
else:
if not onnx_models.has(agent.onnx_model_path):
assert(
FileAccess.file_exists(agent.onnx_model_path),
(
"Onnx Model Path set on %s node does not exist: %s"
% [agent.get_path(), agent.onnx_model_path]
)
)
onnx_models[agent.onnx_model_path] = ONNXModel.new(agent.onnx_model_path, 1)
agent_onnx_model = onnx_models[agent.onnx_model_path]
agent.onnx_model = agent_onnx_model
if not agent_onnx_model.action_means_only_set:
agent_onnx_model.set_action_means_only(action_space)
_set_heuristic("model", agents_inference)
func _initialize_demo_recording():
if agent_demo_record:
expert_demo_save_path = agent_demo_record.expert_demo_save_path
assert(
not expert_demo_save_path.is_empty(),
"Expert demo save path set in %s is empty." % agent_demo_record.get_path()
)
InputMap.add_action("RemoveLastDemoEpisode")
InputMap.action_add_event(
"RemoveLastDemoEpisode", agent_demo_record.remove_last_episode_key
)
current_demo_trajectory.resize(2)
current_demo_trajectory[0] = []
current_demo_trajectory[1] = []
agent_demo_record.heuristic = "demo_record"
func _physics_process(_delta):
# two modes, human control, agent control
# pause tree, send obs, get actions, set actions, unpause tree
_demo_record_process()
if n_action_steps % action_repeat != 0:
n_action_steps += 1
return
n_action_steps += 1
_training_process()
_inference_process()
_heuristic_process()
func _training_process():
if connected:
get_tree().set_pause(true)
if just_reset:
just_reset = false
var obs = _get_obs_from_agents(agents_training)
var reply = {"type": "reset", "obs": obs}
_send_dict_as_json_message(reply)
# this should go straight to getting the action and setting it checked the agent, no need to perform one phyics tick
get_tree().set_pause(false)
return
if need_to_send_obs:
need_to_send_obs = false
var reward = _get_reward_from_agents()
var done = _get_done_from_agents()
#_reset_agents_if_done() # this ensures the new observation is from the next env instance : NEEDS REFACTOR
var obs = _get_obs_from_agents(agents_training)
var reply = {"type": "step", "obs": obs, "reward": reward, "done": done}
_send_dict_as_json_message(reply)
var handled = handle_message()
func _inference_process():
if agents_inference.size() > 0:
var obs: Array = _get_obs_from_agents(agents_inference)
var actions = []
for agent_id in range(0, agents_inference.size()):
var model: ONNXModel = agents_inference[agent_id].onnx_model
var action = model.run_inference(
obs[agent_id]["obs"], 1.0
)
var action_dict = _extract_action_dict(
action["output"], _action_space_inference[agent_id], model.action_means_only
)
actions.append(action_dict)
_set_agent_actions(actions, agents_inference)
_reset_agents_if_done(agents_inference)
get_tree().set_pause(false)
func _demo_record_process():
if not agent_demo_record:
return
if Input.is_action_just_pressed("RemoveLastDemoEpisode"):
print("[Sync script][Demo recorder] Removing last recorded episode.")
demo_trajectories.remove_at(demo_trajectories.size() - 1)
print("Remaining episode count: %d" % demo_trajectories.size())
if n_action_steps % agent_demo_record.action_repeat != 0:
return
var obs_dict: Dictionary = agent_demo_record.get_obs()
# Get the current obs from the agent
assert(
obs_dict.has("obs"),
"Demo recorder needs an 'obs' key in get_obs() returned dictionary to record obs from."
)
current_demo_trajectory[0].append(obs_dict.obs)
# Get the action applied for the current obs from the agent
agent_demo_record.set_action()
var acts = agent_demo_record.get_action()
var terminal = agent_demo_record.get_done()
# Record actions only for non-terminal states
if terminal:
agent_demo_record.set_done_false()
else:
current_demo_trajectory[1].append(acts)
if terminal:
#current_demo_trajectory[2].append(true)
demo_trajectories.append(current_demo_trajectory.duplicate(true))
print("[Sync script][Demo recorder] Recorded episode count: %d" % demo_trajectories.size())
current_demo_trajectory[0].clear()
current_demo_trajectory[1].clear()
func _heuristic_process():
for agent in agents_heuristic:
_reset_agents_if_done(agents_heuristic)
func _extract_action_dict(action_array: Array, action_space: Dictionary, action_means_only: bool):
var index = 0
var result = {}
for key in action_space.keys():
var size = action_space[key]["size"]
var action_type = action_space[key]["action_type"]
if action_type == "discrete":
var largest_logit: float # Value of the largest logit for this action in the actions array
var largest_logit_idx: int # Index of the largest logit for this action in the actions array
for logit_idx in range(0, size):
var logit_value = action_array[index + logit_idx]
if logit_value > largest_logit:
largest_logit = logit_value
largest_logit_idx = logit_idx
result[key] = largest_logit_idx # Index of the largest logit is the discrete action value
index += size
elif action_type == "continuous":
# For continous actions, we only take the action mean values
result[key] = clamp_array(action_array.slice(index, index + size), -1.0, 1.0)
if action_means_only:
index += size # model only outputs action means, so we move index by size
else:
index += size * 2 # model outputs logstd after action mean, we skip the logstd part
else:
assert(false, 'Only "discrete" and "continuous" action types supported. Found: %s action type set.' % action_type)
return result
## For AIControllers that inherit mode from sync, sets the correct mode.
func _set_agent_mode(agent: Node):
var agent_inherits_mode: bool = agent.control_mode == agent.ControlModes.INHERIT_FROM_SYNC
if agent_inherits_mode:
match control_mode:
ControlModes.HUMAN:
agent.control_mode = agent.ControlModes.HUMAN
ControlModes.TRAINING:
agent.control_mode = agent.ControlModes.TRAINING
ControlModes.ONNX_INFERENCE:
agent.control_mode = agent.ControlModes.ONNX_INFERENCE
func _get_agents():
all_agents = get_tree().get_nodes_in_group("AGENT")
for agent in all_agents:
_set_agent_mode(agent)
if agent.control_mode == agent.ControlModes.TRAINING:
agents_training.append(agent)
elif agent.control_mode == agent.ControlModes.ONNX_INFERENCE:
agents_inference.append(agent)
elif agent.control_mode == agent.ControlModes.HUMAN:
agents_heuristic.append(agent)
elif agent.control_mode == agent.ControlModes.RECORD_EXPERT_DEMOS:
assert(
not agent_demo_record,
"Currently only a single AIController can be used for recording expert demos."
)
agent_demo_record = agent
var training_agent_count = agents_training.size()
agents_training_policy_names.resize(training_agent_count)
for i in range(0, training_agent_count):
agents_training_policy_names[i] = agents_training[i].policy_name
func _set_heuristic(heuristic, agents: Array):
for agent in agents:
agent.set_heuristic(heuristic)
func _handshake():
print("performing handshake")
var json_dict = _get_dict_json_message()
assert(json_dict["type"] == "handshake")
var major_version = json_dict["major_version"]
var minor_version = json_dict["minor_version"]
if major_version != MAJOR_VERSION:
print("WARNING: major verison mismatch ", major_version, " ", MAJOR_VERSION)
if minor_version != MINOR_VERSION:
print("WARNING: minor verison mismatch ", minor_version, " ", MINOR_VERSION)
print("handshake complete")
func _get_dict_json_message():
# returns a dictionary from of the most recent message
# this is not waiting
while stream.get_available_bytes() == 0:
stream.poll()
if stream.get_status() != 2:
print("server disconnected status, closing")
get_tree().quit()
return null
OS.delay_usec(10)
var message = stream.get_string()
var json_data = JSON.parse_string(message)
return json_data
func _send_dict_as_json_message(dict):
stream.put_string(JSON.stringify(dict, "", false))
func _send_env_info():
var json_dict = _get_dict_json_message()
assert(json_dict["type"] == "env_info")
var message = {
"type": "env_info",
"observation_space": _obs_space_training,
"action_space": _action_space_training,
"n_agents": len(agents_training),
"agent_policy_names": agents_training_policy_names
}
_send_dict_as_json_message(message)
func connect_to_server():
print("Waiting for one second to allow server to start")
OS.delay_msec(1000)
print("trying to connect to server")
stream = StreamPeerTCP.new()
# "localhost" was not working on windows VM, had to use the IP
var ip = "127.0.0.1"
var port = _get_port()
var connect = stream.connect_to_host(ip, port)
stream.set_no_delay(true) # TODO check if this improves performance or not
stream.poll()
# Fetch the status until it is either connected (2) or failed to connect (3)
while stream.get_status() < 2:
stream.poll()
return stream.get_status() == 2
func _get_args():
print("getting command line arguments")
var arguments = {}
for argument in OS.get_cmdline_args():
print(argument)
if argument.find("=") > -1:
var key_value = argument.split("=")
arguments[key_value[0].lstrip("--")] = key_value[1]
else:
# Options without an argument will be present in the dictionary,
# with the value set to an empty string.
arguments[argument.lstrip("--")] = ""
return arguments
func _get_speedup():
print(args)
return args.get("speedup", str(speed_up)).to_float()
func _get_port():
return args.get("port", DEFAULT_PORT).to_int()
func _set_seed():
var _seed = args.get("env_seed", DEFAULT_SEED).to_int()
seed(_seed)
func _set_action_repeat():
action_repeat = args.get("action_repeat", str(action_repeat)).to_int()
func disconnect_from_server():
stream.disconnect_from_host()
func handle_message() -> bool:
# get json message: reset, step, close
var message = _get_dict_json_message()
if message["type"] == "close":
print("received close message, closing game")
get_tree().quit()
get_tree().set_pause(false)
return true
if message["type"] == "reset":
print("resetting all agents")
_reset_agents()
just_reset = true
get_tree().set_pause(false)
#print("resetting forcing draw")
# RenderingServer.force_draw()
# var obs = _get_obs_from_agents()
# print("obs ", obs)
# var reply = {
# "type": "reset",
# "obs": obs
# }
# _send_dict_as_json_message(reply)
return true
if message["type"] == "call":
var method = message["method"]
var returns = _call_method_on_agents(method)
var reply = {"type": "call", "returns": returns}
print("calling method from Python")
_send_dict_as_json_message(reply)
return handle_message()
if message["type"] == "action":
var action = message["action"]
_set_agent_actions(action, agents_training)
need_to_send_obs = true
get_tree().set_pause(false)
return true
print("message was not handled")
return false
func _call_method_on_agents(method):
var returns = []
for agent in all_agents:
returns.append(agent.call(method))
return returns
func _reset_agents_if_done(agents = all_agents):
for agent in agents:
if agent.get_done():
agent.set_done_false()
func _reset_agents(agents = all_agents):
for agent in agents:
agent.needs_reset = true
#agent.reset()
func _get_obs_from_agents(agents: Array = all_agents):
var obs = []
for agent in agents:
obs.append(agent.get_obs())
return obs
func _get_reward_from_agents(agents: Array = agents_training):
var rewards = []
for agent in agents:
rewards.append(agent.get_reward())
agent.zero_reward()
return rewards
func _get_done_from_agents(agents: Array = agents_training):
var dones = []
for agent in agents:
var done = agent.get_done()
if done:
agent.set_done_false()
dones.append(done)
return dones
func _set_agent_actions(actions, agents: Array = all_agents):
for i in range(len(actions)):
agents[i].set_action(actions[i])
func clamp_array(arr: Array, min: float, max: float):
var output: Array = []
for a in arr:
output.append(clamp(a, min, max))
return output
## Save recorded export demos on window exit (Close game window instead of "Stop" button in Godot Editor)
func _notification(what):
if demo_trajectories.size() == 0 or expert_demo_save_path.is_empty():
return
if what == NOTIFICATION_PREDELETE:
var json_string = JSON.stringify(demo_trajectories, "", false)
var file = FileAccess.open(expert_demo_save_path, FileAccess.WRITE)
if not file:
var error: Error = FileAccess.get_open_error()
assert(not error, "There was an error opening the file: %d" % error)
file.store_line(json_string)
var error = file.get_error()
assert(not error, "There was an error after trying to write to the file: %d" % error)