Source code for textworld.envs.glulx.git_glulx

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT license.


# -*- coding: utf-8 -*-
import sys
import textwrap
import subprocess

import importlib.resources
from typing import Union
from os.path import join as pjoin


from glk import ffi, lib
from io import StringIO

import textworld
from textworld.core import GameState
from textworld.core import GameNotRunningError

GLULX_PATH = pjoin(importlib.resources.files("textworld"), "thirdparty", "glulx", "Git-Glulx")


def _strip_input_prompt_symbol(text: str) -> str:
    if text.endswith("\n>"):
        return text[:-2]

    return text


[docs]class GitGlulxEnv(textworld.Environment): """ Environment to support playing Glulx games. This environment supports playing text-based games that were compiled for the `Glulx virtual machine <https://www.eblong.com/zarf/glulx>`_. The main advantage of using Glulx over Z-Machine is it uses 32-bit data and addresses, so it can handle game files up to four gigabytes long. This comes handy when we want to generate large world with a lot of objects in it. We use a customized version of `git-glulx <https://github.com/DavidKinder/Git>`_ as the glulx interpreter. That way we don't rely on stdin/stdout to communicate with the interpreter but instead use UNIX sockets. """ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._process = None
[docs] def close(self) -> None: if self.game_running: self._process.kill() self._process.wait() self._process = None try: lib.cleanup_glulx(self._names_struct) except AttributeError: pass # Attempted to kill before reset
def __del__(self): self.close()
[docs] def load(self, ulx_file: str) -> None: # TODO check file format. self.close() # Terminate existing process if needed. self._gamefile = ulx_file
@property def game_running(self) -> bool: """ Determines if the game is still running. """ return self._process is not None and self._process.poll() is None
[docs] def step(self, command: str) -> str: if not self.game_running: raise GameNotRunningError() self.state = GameState() self.state.last_command = command.strip() self.state.raw = self._send(self.state.last_command) if self.state.raw is None: raise GameNotRunningError() self.state.feedback = _strip_input_prompt_symbol(self.state.raw) self.state.score = 0 # Default value. self.state.done = False # Default value. return self.state, self.state.score, self.state.done
def _send(self, command: str) -> Union[str, None]: """ Send a command directly to the interpreter. This method will not affect the internal state variable. """ if not self.game_running: return None if len(command) == 0: command = " " c_command = ffi.new('char[]', command.encode('utf-8')) result = lib.communicate(self._names_struct, c_command) if result == ffi.NULL: self.close() return None result = ffi.gc(result, lib.free) return ffi.string(result).decode('utf-8')
[docs] def reset(self) -> str: self.close() # Terminate existing process if needed. self._names_struct = ffi.new('struct sock_names*') lib.init_glulx(self._names_struct) sock_name = ffi.string(self._names_struct.sock_name).decode('utf-8') self._process = subprocess.Popen(["%s/git-glulx-ml" % (GLULX_PATH,), self._gamefile, '-g', sock_name, '-q']) c_feedback = lib.get_output_nosend(self._names_struct) if c_feedback == ffi.NULL: self.close() raise ValueError("Game failed to start properly: {}.".format(self._gamefile)) c_feedback = ffi.gc(c_feedback, lib.free) feedback = ffi.string(c_feedback).decode('utf-8') feedback = _strip_input_prompt_symbol(feedback) self.state = GameState(feedback=feedback, raw=feedback) return self.state
[docs] def render(self, mode: str = "human") -> None: outfile = StringIO() if mode in ['ansi', "text"] else sys.stdout msg = self.state.feedback.rstrip() + "\n" if self.display_command_during_render and self.state.last_command is not None: msg = '> ' + self.state.last_command + "\n" + msg # Wrap each paragraph. if mode == "human": paragraphs = msg.split("\n") paragraphs = ["\n".join(textwrap.wrap(paragraph, width=80)) for paragraph in paragraphs] msg = "\n".join(paragraphs) outfile.write(msg + "\n") if mode == "text": outfile.seek(0) return outfile.read() if mode == 'ansi': return outfile