Source code for textworld.envs.glulx.git_glulx

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT license.


# -*- coding: utf-8 -*-
import sys
import textwrap
import subprocess
from pkg_resources import Requirement, resource_filename

from typing import Union

from glk import ffi, lib
from io import StringIO

import textworld
from textworld.core import GameState
from textworld.core import GameNotRunningError

GLULX_PATH = resource_filename(Requirement.parse('textworld'), 'textworld/thirdparty/glulx/Git-Glulx')


def _strip_input_prompt_symbol(text: str) -> str:
    if text.endswith("\n>"):
        return text[:-2]

    return text


[docs]class GitGlulxEnv(textworld.Environment): """ Environment to support playing Glulx games. This environment supports playing text-based games that were compiled for the `Glulx virtual machine <https://www.eblong.com/zarf/glulx>`_. The main advantage of using Glulx over Z-Machine is it uses 32-bit data and addresses, so it can handle game files up to four gigabytes long. This comes handy when we want to generate large world with a lot of objects in it. We use a customized version of `git-glulx <https://github.com/DavidKinder/Git>`_ as the glulx interpreter. That way we don't rely on stdin/stdout to communicate with the interpreter but instead use UNIX sockets. """ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._process = None
[docs] def close(self) -> None: if self.game_running: self._process.kill() self._process.wait() self._process = None try: lib.cleanup_glulx(self._names_struct) except AttributeError: pass # Attempted to kill before reset
def __del__(self): self.close()
[docs] def load(self, ulx_file: str) -> None: # TODO check file format. self.close() # Terminate existing process if needed. self._gamefile = ulx_file
@property def game_running(self) -> bool: """ Determines if the game is still running. """ return self._process is not None and self._process.poll() is None
[docs] def step(self, command: str) -> str: if not self.game_running: raise GameNotRunningError() self.state = GameState() self.state.last_command = command.strip() self.state.raw = self._send(self.state.last_command) if self.state.raw is None: raise GameNotRunningError() self.state.feedback = _strip_input_prompt_symbol(self.state.raw) self.state.score = 0 # Default value. self.state.done = False # Default value. return self.state, self.state.score, self.state.done
def _send(self, command: str) -> Union[str, None]: """ Send a command directly to the interpreter. This method will not affect the internal state variable. """ if not self.game_running: return None if len(command) == 0: command = " " c_command = ffi.new('char[]', command.encode('utf-8')) result = lib.communicate(self._names_struct, c_command) if result == ffi.NULL: self.close() return None result = ffi.gc(result, lib.free) return ffi.string(result).decode('utf-8')
[docs] def reset(self) -> str: self.close() # Terminate existing process if needed. self._names_struct = ffi.new('struct sock_names*') lib.init_glulx(self._names_struct) sock_name = ffi.string(self._names_struct.sock_name).decode('utf-8') self._process = subprocess.Popen(["%s/git-glulx-ml" % (GLULX_PATH,), self._gamefile, '-g', sock_name, '-q']) c_feedback = lib.get_output_nosend(self._names_struct) if c_feedback == ffi.NULL: self.close() raise ValueError("Game failed to start properly: {}.".format(self._gamefile)) c_feedback = ffi.gc(c_feedback, lib.free) feedback = ffi.string(c_feedback).decode('utf-8') feedback = _strip_input_prompt_symbol(feedback) self.state = GameState(feedback=feedback, raw=feedback) return self.state
[docs] def render(self, mode: str = "human") -> None: outfile = StringIO() if mode in ['ansi', "text"] else sys.stdout msg = self.state.feedback.rstrip() + "\n" if self.display_command_during_render and self.state.last_command is not None: msg = '> ' + self.state.last_command + "\n" + msg # Wrap each paragraph. if mode == "human": paragraphs = msg.split("\n") paragraphs = ["\n".join(textwrap.wrap(paragraph, width=80)) for paragraph in paragraphs] msg = "\n".join(paragraphs) outfile.write(msg + "\n") if mode == "text": outfile.seek(0) return outfile.read() if mode == 'ansi': return outfile