# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT license.
# -*- coding: utf-8 -*-
import sys
import textwrap
import subprocess
import importlib.resources
from typing import Union
from os.path import join as pjoin
from glk import ffi, lib
from io import StringIO
import textworld
from textworld.core import GameState
from textworld.core import GameNotRunningError
GLULX_PATH = pjoin(importlib.resources.files("textworld"), "thirdparty", "glulx", "Git-Glulx")
def _strip_input_prompt_symbol(text: str) -> str:
if text.endswith("\n>"):
return text[:-2]
return text
[docs]class GitGlulxEnv(textworld.Environment):
""" Environment to support playing Glulx games.
This environment supports playing text-based games that were compiled for
the `Glulx virtual machine <https://www.eblong.com/zarf/glulx>`_. The main
advantage of using Glulx over Z-Machine is it uses 32-bit data and
addresses, so it can handle game files up to four gigabytes long. This
comes handy when we want to generate large world with a lot of objects
in it.
We use a customized version of `git-glulx <https://github.com/DavidKinder/Git>`_
as the glulx interpreter. That way we don't rely on stdin/stdout to
communicate with the interpreter but instead use UNIX sockets.
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._process = None
[docs] def close(self) -> None:
if self.game_running:
self._process.kill()
self._process.wait()
self._process = None
try:
lib.cleanup_glulx(self._names_struct)
except AttributeError:
pass # Attempted to kill before reset
def __del__(self):
self.close()
[docs] def load(self, ulx_file: str) -> None:
# TODO check file format.
self.close() # Terminate existing process if needed.
self._gamefile = ulx_file
@property
def game_running(self) -> bool:
""" Determines if the game is still running. """
return self._process is not None and self._process.poll() is None
[docs] def step(self, command: str) -> str:
if not self.game_running:
raise GameNotRunningError()
self.state = GameState()
self.state.last_command = command.strip()
self.state.raw = self._send(self.state.last_command)
if self.state.raw is None:
raise GameNotRunningError()
self.state.feedback = _strip_input_prompt_symbol(self.state.raw)
self.state.score = 0 # Default value.
self.state.done = False # Default value.
return self.state, self.state.score, self.state.done
def _send(self, command: str) -> Union[str, None]:
""" Send a command directly to the interpreter.
This method will not affect the internal state variable.
"""
if not self.game_running:
return None
if len(command) == 0:
command = " "
c_command = ffi.new('char[]', command.encode('utf-8'))
result = lib.communicate(self._names_struct, c_command)
if result == ffi.NULL:
self.close()
return None
result = ffi.gc(result, lib.free)
return ffi.string(result).decode('utf-8')
[docs] def reset(self) -> str:
self.close() # Terminate existing process if needed.
self._names_struct = ffi.new('struct sock_names*')
lib.init_glulx(self._names_struct)
sock_name = ffi.string(self._names_struct.sock_name).decode('utf-8')
self._process = subprocess.Popen(["%s/git-glulx-ml" % (GLULX_PATH,), self._gamefile, '-g', sock_name, '-q'])
c_feedback = lib.get_output_nosend(self._names_struct)
if c_feedback == ffi.NULL:
self.close()
raise ValueError("Game failed to start properly: {}.".format(self._gamefile))
c_feedback = ffi.gc(c_feedback, lib.free)
feedback = ffi.string(c_feedback).decode('utf-8')
feedback = _strip_input_prompt_symbol(feedback)
self.state = GameState(feedback=feedback, raw=feedback)
return self.state
[docs] def render(self, mode: str = "human") -> None:
outfile = StringIO() if mode in ['ansi', "text"] else sys.stdout
msg = self.state.feedback.rstrip() + "\n"
if self.display_command_during_render and self.state.last_command is not None:
msg = '> ' + self.state.last_command + "\n" + msg
# Wrap each paragraph.
if mode == "human":
paragraphs = msg.split("\n")
paragraphs = ["\n".join(textwrap.wrap(paragraph, width=80)) for paragraph in paragraphs]
msg = "\n".join(paragraphs)
outfile.write(msg + "\n")
if mode == "text":
outfile.seek(0)
return outfile.read()
if mode == 'ansi':
return outfile