- Joined
- Jan 23, 2015
- Messages
- 131
Amazing idea, yet very easily implemented. Great find!
CUSTOM_MAP_DATA_PATH to point to your CustomMapData folderpython ...\wc3_interpreter.py (type help for list of commands and usage)help for a list of available commands.file command. The new function will run over the old one, and you can test it again without restarting wc3 and rebuilding the map. You can also add breakpoints to inspect and change variable values-console, but this was annoying to do with the limited ingame chat. If you're playing in multiplayer, you can later check the values in the replay.-- Useful globals that tells you if the game is offline (single player I think), online or replay.
-- The first 3 are constants and should not be changed
GAME_STATUS_OFFLINE = 0
GAME_STATUS_ONLINE = 1
GAME_STATUS_REPLAY = 2
GameStatus = 0 -- one of GAME_STATUS_OFFLINE, GAME_STATUS_ONLINE, GAME_STATUS_REPLAY
-- sets GameStatus variable which decides if game is online, offline or replay
local function SetGameStatus()
local firstPlayer ---@type player
local u ---@type unit
local selected ---@type boolean
-- find an actual player
firstPlayer = Player(0)
while not ((GetPlayerController(firstPlayer) == MAP_CONTROL_USER and GetPlayerSlotState(firstPlayer) == PLAYER_SLOT_STATE_PLAYING)) do
firstPlayer = Player(GetPlayerId(firstPlayer) + 1)
end
-- force the player to select a dummy unit
u = CreateUnit(firstPlayer, FourCC('hfoo'), 0, 0, 0)
SelectUnit(u, true)
selected = IsUnitSelected(u, firstPlayer)
RemoveUnit(u)
if (selected) then
-- detect if replay or offline game
if (ReloadGameCachesFromDisk()) then
GameStatus = GAME_STATUS_OFFLINE
else
GameStatus = GAME_STATUS_REPLAY
end
else
-- if the unit wasn't selected instantly, the game is online
GameStatus = GAME_STATUS_ONLINE
end
end
OnInit.global(SetGameStatus)
if Debug then Debug.beginFile("LiveCoding") end
--[[
Lua Live Coding v1.5.0 by Tomotz
This tool allows connecting to your game with an external cli, and run lua code in it - it allows you to open a windows terminal and run code inside your game. Works for single player and in replays
Features:
- A cli interpreter that connects to your game while it's running, and runs lua code inside the game.
- Write single line lua instructions in the terminal, and have them run inside the game.
- Get command output in the terminal.
- Run lua script files.
- Run new code during replay and let you debug the replay.
- Breakpoint support allowing you to debug in the breakpoint context.
Note that currently the interpreter does not support multiplayer (It will not run if there is more than one active player). Support for multiplayer can be added but will be a bit complicated since the backend files data needs to be synced. If I'll see a demand for the feature, I'll add it.
Installation and usage instructions:
- Copy the lua code to your map and install the requirements.
- Install python3 (tested with python 3.9 but would probably work with any python3 version)
- pip install watchdog
- Create wc3_interpreter.py script, and edit `CUSTOM_MAP_DATA_PATH` to point to your `CustomMapData` folder
- In windows terminal run `python <full path>\wc3_interpreter.py` (type help for list of commands and usage)
- Tip - if you want to debug a replay, run warcraft with nowfpause, and then you can alt tab to the shell without the game pausing:
"C:\Program Files (x86)\Warcraft III\_retail_\x86_64\Warcraft III.exe" -launch -nowfpause
Run wc3_interpreter.py and then `help` for a list of available commands.
* Note: exiting or restarting the script while the game is running will cause a missalignment in file ids. You must use jump command to fix it
Algorithm explanation:
The lua code polls on the creation of new files with increasing indices (in0.txt, in1.txt, ...). When a new file is found, it reads the content, runs it as lua code, and saves the output to a single out.txt file with the command index.
For breakpoints, each coroutine writes its data to a per-coroutine file (bp_data_<thread_id>.txt) and a shared metadata file (bp_threads.txt) lists all active breakpoint threads.
Suggested usages:
- Map Development - You created a new global function, you test your map and it doesn't do what you meant. You can now create a file with this function, edit what you wish, and run `file` command. The new function will run over the old one, and you can test it again without restarting wc3 and rebuilding the map.
- Value Lookups - You can check variable values and other state checks while playing (in single player). You could already do that with DebugUtils `-console`, but this was annoying to do with the limited ingame chat. If you're playing in multiplayer, you can later check the values in the replay.
- Map Debugging - Reimplement global functions dynamically while playing, and add prints and logs as needed
- Replay Debugging - Perform quarries or make things happen differently at replay - change values of variables, create new units etc.
Requires:
My version of FileIO (lua). Original version by Trokkin - https://www.hiveworkshop.com/threads/fileio-lua-optimized.347049/
StringEscape (by me) - https://www.hiveworkshop.com/threads/optimized-syncstream-and-stringescape.367925/
TotalInitialization by Bribe - https://www.hiveworkshop.com/threads/total-initialization.317099/
Optionaly requires:
PrettyString (by me) - https://www.hiveworkshop.com/threads/logutils.357625/. If you want better output formatting for tables and wc3 handles
Credits:
TriggerHappy GameStatus (Replay Detection) https://www.hiveworkshop.com/threads/gamestatus-replay-detection.293176/
* SetGameStatus was taken from there to allow detecting that the game is running in replay mode.
--]]
do
-- Period to check for new commands to execute
-- Note that once a command was executed, the polling period increases to 0.1 seconds to allow fast interpreting.
-- The period goes back to normal after no new commands were found for 60 seconds.
local PERIOD = 5
-- Directory to save the input/output files. Must match the python script path
local FILES_ROOT = "Interpreter"
-- The code will desync in multiplayer, so we only allow running it in single player or replay mode
local isDisabled ---@type boolean
EnabledBreakpoints = {} ---@type table<string | integer, boolean> -- saves for each breakpoint id if it is disabled. Allows the debugger to disable/enable bps
-- Field separator for data files (ASCII 31 = unit separator)
local FIELD_SEP = string.char(31)
-- ============================================================================
-- Shared Output File Format
-- ============================================================================
-- Both normal commands and breakpoints use a similar output format:
-- For normal commands: out.txt contains "{index}SEPARATOR{result}"
-- For breakpoint results: bp_out.txt contains "{thread_id}:{cmd_index}SEPARATOR{result}"
--- Write a result to an output file with an index prefix
--- This is the shared format used by both normal commands and breakpoints
---@param filename string -- The output file path
---@param index string|integer -- The command index (or thread_id:cmd_index for breakpoints)
---@param result string -- The result to write
local function writeIndexedOutput(filename, index, result)
FileIO.Save(filename, tostring(index) .. FIELD_SEP .. result, false)
end
--- Helper to format output for responses
---@param value any
---@return string
local function formatOutput(value)
if PrettyString then
return PrettyString(value)
end
return tostring(value)
end
--- Create an environment table that includes localVariables with globals as fallback
---@param localVariables {[1]: string, [2]: any}[]? -- Array of {name, value} pairs
---@return table
local function createBreakpointEnv(localVariables)
local env = {}
setmetatable(env, {__index = _G})
if localVariables then
for _, pair in ipairs(localVariables) do
local name, value = pair[1], pair[2]
env[name] = value
end
end
return env
end
-- ============================================================================
-- Breakpoint System
-- ============================================================================
-- Breakpoints use per-coroutine data files and a shared metadata file:
-- - bp_threads.txt: Lists all thread IDs currently in a breakpoint (one per line)
-- - bp_data_<thread_id>.txt: Contains breakpoint data as a single FIELD_SEP-separated record:
-- bp_id<SEP>value<SEP>stack<SEP>stacktrace<SEP>var1<SEP>val1<SEP>var2<SEP>val2...
-- - bp_in_<thread_id>_<idx>.txt: Per-thread input commands (incrementing files due to WC3 file caching)
-- Content is just the raw command (no prefix needed since thread_id is in filename)
-- - bp_out.txt: Output results with format "thread_id:cmd_indexFIELD_SEPresult"
local activeBreakpointThreads = {} ---@type table<string, boolean> -- Maps thread_id to true if in breakpoint
local nextBreakpointCmdIndex = {} ---@type table<string, integer> -- Maps thread_id to next command index (persists across Breakpoint calls)
--- Get a unique identifier for the current coroutine
---@return string
local function getThreadId()
local co = coroutine.running()
if co then
return tostring(co):match("thread: (.+)") or tostring(co)
end
return "main"
end
--- Update the bp_threads.txt metadata file with all active breakpoint threads
local function updateBreakpointThreadsFile()
local threads = {}
for threadId, _ in pairs(activeBreakpointThreads) do
table.insert(threads, threadId)
end
FileIO.Save(FILES_ROOT .. "\\bp_threads.txt", table.concat(threads, FIELD_SEP), false)
end
--- Write breakpoint data file for a specific thread
--- Uses FIELD_SEP (ASCII 31) as field separator to avoid conflicts with variable values
--- Format: bp_id<SEP>value<SEP>stack<SEP>stacktrace<SEP>var1<SEP>val1<SEP>var2<SEP>val2...
---@param threadId string
---@param breakpointId string|integer
---@param localVariables {[1]: string, [2]: any}[]? -- Array of {name, value} pairs
---@param env table? -- Environment table to read current values from (for modified variables)
local function writeBreakpointDataFile(threadId, breakpointId, localVariables, env)
local fields = {}
-- Add bp_id
table.insert(fields, "bp_id")
table.insert(fields, tostring(breakpointId))
-- Add stacktrace (use Debug.traceback for WC3 compatibility)
local stack = ""
if Debug and Debug.traceback then
stack = Debug.traceback() or ""
end
table.insert(fields, "stack")
table.insert(fields, stack)
-- Add local variable values in order (read from env if provided, otherwise from original pairs)
if localVariables then
for _, pair in ipairs(localVariables) do
local name = pair[1]
local value = env and env[name] or pair[2]
table.insert(fields, name)
table.insert(fields, formatOutput(value))
end
end
FileIO.Save(FILES_ROOT .. "\\bp_data_" .. threadId .. ".txt", table.concat(fields, FIELD_SEP), false)
end
--- Remove breakpoint data file for a specific thread
---@param threadId string
local function removeBreakpointDataFile(threadId)
-- Write empty content to indicate the breakpoint is no longer active
FileIO.Save(FILES_ROOT .. "\\bp_data_" .. threadId .. ".txt", "", false)
end
-- Helper function to return all local variable values from the environment
---@param env table -- Environment table
---@param vars {[1]: string, [2]: any}[]? -- Array of {name, value} pairs
---@return any ...
local function returnLocalValues(env, vars)
if not vars or #vars == 0 then return end
local values = {}
for _, pair in ipairs(vars) do
local name = pair[1]
table.insert(values, env[name])
end
return table.unpack(values)
end
---@param threadId string
---@param cmdIndex string|integer
---@param data string
local function writeBPOutput(threadId, cmdIndex, data)
writeIndexedOutput(FILES_ROOT .. "\\bp_out.txt", threadId .. ":" .. cmdIndex, data)
end
--- Put a breakpoint in your code that will halt execution of a function and wait for external debugger instructions.
--- Returns the (potentially modified) local variable values in the same order as the input array.
--- Usage: `var1, var2 = Breakpoint(id, {{"var1", var1}, {"var2", var2}})`
---@param breakpointId integer | string -- Unique id for the breakpoint. Used for auto breakpoints set from the debugger. When called from user code, it should contain a unique string (that is not a number) to allow you to recognise the breakpoint.
---@param localVariables {[1]: string, [2]: any}[]? -- Array of {name, value} pairs. Will be used as the environment in the code called from the debugger. The values can be modified by the debugger and will be returned when the breakpoint continues.
---@param condition string? -- a string containing a lua expression. Breakpoint will only trigger if the expression is true.
---@param startsEnabled boolean? -- if false, the breakpoint will start disabled and must be enabled from the debugger. Default is true. This allows you to dynamically enable a static breakpoint (which can be set anywhere in the code unlike the dynamic one)
---@return any ... -- Returns the local variable values in the same order as the input array (potentially modified by the debugger)
--- Notes: 1. This function should only be called from yieldable context.
--- 2. Execution of the thread will not continue unless you connect the debugger, be sure not to keep breakpoints in your final code.
--- 3. To avoid desyncs, this function will do nothing in multiplayer
function Breakpoint(breakpointId, localVariables, condition, startsEnabled)
-- Create environment with locals and globals accessible
local env = createBreakpointEnv(localVariables)
if isDisabled then return returnLocalValues(env, localVariables) end
if not coroutine.isyieldable() then
if Debug then Debug.throwError("Coroutine is not yieldable.") end
return returnLocalValues(env, localVariables)
end
if EnabledBreakpoints[breakpointId] == nil then
EnabledBreakpoints[breakpointId] = (startsEnabled == nil) or startsEnabled
end
if EnabledBreakpoints[breakpointId] == false then return returnLocalValues(env, localVariables) end
if condition then
local cond = load(condition, "breakpoint_condition", "t", env)
if cond == nil then
if Debug then Debug.throwError("error executing breakpoint condition") end
return returnLocalValues(env, localVariables)
end
if not cond() then return returnLocalValues(env, localVariables) end
end
-- Get thread ID and register this breakpoint
local threadId = getThreadId()
activeBreakpointThreads[threadId] = true
-- Write breakpoint data file and update metadata
writeBreakpointDataFile(threadId, breakpointId, localVariables, env)
updateBreakpointThreadsFile()
-- Main breakpoint loop - wait for commands
-- Uses per-thread incrementing bp_in_<threadId>_<idx>.txt files due to WC3 file caching
-- Command index persists across Breakpoint() calls for the same thread to avoid reading stale files
local cmdIndex = nextBreakpointCmdIndex[threadId] or 0
while true do
-- Check for commands in bp_in_<threadId>_<cmdIndex>.txt
local filename = FILES_ROOT .. "\\bp_in_" .. threadId .. "_" .. cmdIndex .. ".txt"
local command = FileIO.Load(filename)
if command ~= nil then
-- Command found - file content is just the raw command
if command == "continue" then
-- Acknowledge the continue command for the debugger protocol
writeBPOutput(threadId, cmdIndex, "")
-- Advance and persist the index so next Breakpoint() for this thread starts fresh
nextBreakpointCmdIndex[threadId] = cmdIndex + 1
-- Clean up and exit breakpoint
activeBreakpointThreads[threadId] = nil
removeBreakpointDataFile(threadId)
updateBreakpointThreadsFile()
-- Return the (potentially modified) local variable values
return returnLocalValues(env, localVariables)
end
-- Execute the command with proper error handling
local cur_func, loadErr = load(command, "breakpoint_cmd", "t", env)
local outData
if cur_func == nil then
outData = "Syntax error: " .. tostring(loadErr)
else
local ok, result = pcall(cur_func)
if ok then
outData = formatOutput(result)
else
outData = "Runtime error: " .. tostring(result)
end
end
-- Write result using shared format (thread_id:cmd_index as index)
writeBPOutput(threadId, cmdIndex, outData)
-- Update breakpoint data file (in case locals changed)
writeBreakpointDataFile(threadId, breakpointId, localVariables, env)
-- Advance and persist the index
cmdIndex = cmdIndex + 1
end
TriggerSleepAction(0.1)
end
end
-- ============================================================================
-- Normal Command System
-- ============================================================================
-- Uses single out.txt file with format: "{index}SEPARATOR{result}"
local nextFile = 0
local curPeriod = PERIOD
local lastCommandExecuteTime = 0
local timer = nil ---@type timer?
function CheckFiles()
--- first we trigger the next run in case this run crashes or returns
TimerStart(timer, curPeriod, false, CheckFiles)
-- To make the replay as close as possible to the original game, we do call the timer on both, and just return right away if multiplayer
if isDisabled then return end
local commands = FileIO.Load(FILES_ROOT .. "\\in" .. nextFile .. ".txt")
if commands ~= nil then
-- command found, increase period to 0.1s, run the command and return the result
curPeriod = 0.1
TimerStart(timer, 0.1, false, CheckFiles)
lastCommandExecuteTime = os.clock()
local cur_func = load(commands)
local result = nil
if cur_func ~= nil then
local ok = false
ok, result = pcall(cur_func)
end
-- Use shared output format: index on first line, result on subsequent lines
writeIndexedOutput(FILES_ROOT .. "\\out.txt", nextFile, tostring(result))
nextFile = nextFile + 1
end
if os.clock() - lastCommandExecuteTime > 60 then
-- over 60s passed since last command sent. Return the period to normal
curPeriod = PERIOD
end
end
---@param period number? -- Optional period in seconds for polling. Defaults to 5 seconds. Useful for testing with shorter periods.
function TryInterpret(period)
isDisabled = ((not GameStatus) or GameStatus == GAME_STATUS_ONLINE) and (not bj_isSinglePlayer)
-- Timer is leaked on purpose to keep it running throughout the entire game
timer = CreateTimer()
TimerStart(timer, period or 5, false, CheckFiles)
end
OnInit(function() TryInterpret(5) end)
end
if Debug then Debug.endFile() end
# wc3_interpreter.py
VERSION = "1.5.0"
import os
import re
import time
import signal
import sys
import threading
from queue import Queue, Empty
from typing import Optional, Dict, List, Tuple
# Optional watchdog import for file watching functionality
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
WATCHDOG_AVAILABLE = True
except ImportError:
WATCHDOG_AVAILABLE = False
# You might need to change `D:` to your Warcraft III installation drive
def _get_username() -> str:
try:
return os.getlogin()
except OSError:
return os.environ.get('USER', os.environ.get('USERNAME', 'User'))
CUSTOM_MAP_DATA_PATH = r"D:\Users\{username}\Documents\Warcraft III\CustomMapData\\".format(username=_get_username())
# Check if we're running in wsl. If you're working in linux/wsl env, you need to manually change the next 2 lines
if os.path.exists("/mnt/d/Users/Tom"):
CUSTOM_MAP_DATA_PATH = "/mnt/d/Users/Tom/Documents/Warcraft III/CustomMapData/"
# Allow overriding FILES_ROOT via environment variable for testing
FILES_ROOT = os.environ.get('WC3_INTERPRETER_FILES_ROOT', os.path.join(CUSTOM_MAP_DATA_PATH, "Interpreter"))
def set_files_root(path: str) -> None:
"""Set the FILES_ROOT directory. Useful for testing with a temp directory."""
global FILES_ROOT
FILES_ROOT = path
os.makedirs(FILES_ROOT, exist_ok=True)
# find any pattern starting with `call Preload( "]]i([[` and ending with `]])--[[" )` and concatenate the innter strings
REGEX_PATTERN = rb'call Preload\( "\]\]i\(\[\[(.*?)\]\]\)--\[\[" \)'
# find any pattern starting with `call Preload( "` and ending with `" )` and concatenate the innter strings
READ_REGEX_PATTERN = rb'call Preload\( "(.*?)" \)'
FILE_PREFIX = """function PreloadFiles takes nothing returns nothing
call PreloadStart()
call Preload( "")
endfunction
//!beginusercode
local p={} local i=function(s) table.insert(p,s) end--[[" )
"""
FILE_POSTFIX = """\n call Preload( "]]BlzSetAbilityTooltip(1095656547, table.concat(p), 0)
//!endusercode
function a takes nothing returns nothing
//" )
call PreloadEnd( 0.1 )
endfunction
"""
LINE_PREFIX = '\n call Preload( "]]i([['
LINE_POSTFIX = ']])--[[" )'
# Lua wrapper to make OnInit and its submethods execute immediately instead of registering for later
# This is needed because when running code via the interpreter, the OnInit phase has already passed
# The wrapper also captures and returns the result of the file's return statement
ONINIT_IMMEDIATE_WRAPPER = """do
local function _immediateExec(nameOrFunc, func)
local f = func or nameOrFunc
if type(f) == 'function' then f() end
end
local _savedOnInit = OnInit
OnInit = setmetatable({}, {
__call = function(_, ...) _immediateExec(...) end,
__index = function() return _immediateExec end
})
local __wc3_interpreter_result
local function __wc3_interpreter_run()
__wc3_interpreter_result = (function()
"""
ONINIT_IMMEDIATE_WRAPPER_END = """
end)()
end
local ok, err = pcall(__wc3_interpreter_run)
OnInit = _savedOnInit
if not ok then
error(err)
end
return __wc3_interpreter_result
end"""
# Global state for breakpoint handling
bp_command_indices: Dict[str, int] = {} # Maps thread_id to next command index (per-thread counters)
# Unified interface state for breakpoint handling
# When a breakpoint is hit, it becomes the current context and commands go to it
# Additional breakpoints are queued and handled after the current one is continued
# Main thread owns all state - no locks needed
current_breakpoint: Optional[Tuple[str, Dict]] = None # (thread_id, info) of current breakpoint being handled
pending_breakpoints: List[Tuple[str, Dict]] = [] # Queue of breakpoints waiting to be handled
thread_state_is_bp: Dict[str, bool] = {} # Maps thread_id to whether it's currently in a breakpoint
# Queues for inter-thread communication (thread-safe by design)
# Other threads push to these, main thread reads from them
stdin_queue: Queue[str] = Queue() # User input from stdin reader thread
file_change_queue: Queue = Queue() # File change events from watchdog
# Stdin reader thread
stdin_reader_thread: Optional[threading.Thread] = None
stdin_reader_stop_event: threading.Event = threading.Event()
# Field separator for breakpoint data files (ASCII 31 = unit separator)
FIELD_SEP = bytes([31])
if WATCHDOG_AVAILABLE:
class FileChangeHandler(FileSystemEventHandler):
"""Handler for file change events that pushes to a queue for main thread processing"""
def __init__(self, filepath: str, queue: Queue):
super().__init__()
self.filepath = os.path.abspath(filepath)
self.queue = queue
self.last_modified = 0
def on_modified(self, event) -> None:
if event.is_directory:
return
# Check if this is the file we're watching
if os.path.abspath(event.src_path) == self.filepath:
# Debounce: ignore events within 0.5 seconds of each other
current_time = time.time()
if current_time - self.last_modified < 0.5:
return
self.last_modified = current_time
# Push to queue - main thread will handle the actual work
self.queue.put(self.filepath)
class FileWatcher:
"""File watcher that pushes change events to a queue for main thread processing.
No locks needed - this class is only called from the main thread.
The watchdog Observer threads only push to the queue via FileChangeHandler.
"""
# Global state for file watching
watched_files: Dict[str, any] = {} # filepath -> Observer
def start_watching(self, filepath: str, queue: Queue) -> bool:
"""Start watching a file for changes. Returns True if successful.
Args:
filepath: Path to the file to watch
queue: Queue to push file change events to
"""
if not WATCHDOG_AVAILABLE:
print("watch: watchdog is not installed. Install it with `pip install watchdog` to use watch/unwatch.")
return False
filepath = os.path.abspath(filepath)
if not os.path.exists(filepath):
print(f"Error: File does not exist: {filepath}")
return False
if filepath in self.watched_files:
print(f"Already watching: {filepath}")
return False
directory = os.path.dirname(filepath)
handler = FileChangeHandler(filepath, queue)
observer = Observer()
observer.schedule(handler, directory, recursive=False)
observer.start()
self.watched_files[filepath] = observer
print(f"Now watching: {filepath}")
return True
def stop_watching(self, filepath: str) -> bool:
"""Stop watching a file. Returns True if successful."""
filepath = os.path.abspath(filepath)
if filepath not in self.watched_files:
print(f"Not watching: {filepath}")
return False
observer = self.watched_files.pop(filepath)
observer.stop()
observer.join(timeout=1.0)
print(f"Stopped watching: {filepath}")
return True
def stop_all_watchers(self) -> None:
"""Stop all file watchers."""
for filepath, observer in list(self.watched_files.items()):
observer.stop()
observer.join(timeout=1.0)
self.watched_files.clear()
def list_watched_files(self) -> None:
"""List all currently watched files."""
if not self.watched_files:
print("No files being watched.")
else:
print("Currently watching:")
for filepath in self.watched_files:
print(f" {filepath}")
file_watcher = FileWatcher()
# ============================================================================
# Dynamic breakpoint related code:
# ============================================================================
def load_lua_directory(path: str) -> Dict[str, str]:
lua_files: Dict[str, str] = {}
for root, _, files in os.walk(path):
for f in files:
if f.endswith('.lua'):
full_path = os.path.join(root, f)
with open(full_path, 'r', encoding='utf-8') as file:
lua_files[full_path] = file.read()
return lua_files
def find_lua_function(content: str, func_name: str=None, line_number: int=None):
"""
Returns (start_index, end_index, body_text)
"""
lines = content.splitlines(keepends=True)
if line_number is not None:
# Find the nearest 'function ' line before the given line
for i in range(line_number - 1, -1, -1):
if re.match(r'\s*function\b', lines[i]):
func_line = i
break
else:
return None
elif func_name:
# Find by function name
pattern = rf'\bfunction\s+{re.escape(func_name)}\b'
for i, line in enumerate(lines):
if re.search(pattern, line):
func_line = i
break
else:
return None
else:
return None
# Find the matching 'end'
depth = 0
for j in range(func_line, len(lines)):
if re.match(r'\s*function\b', lines[j]):
depth += 1
elif re.match(r'\s*end\b', lines[j]):
depth -= 1
if depth == 0:
start = sum(len(l) for l in lines[:func_line])
end = sum(len(l) for l in lines[:j + 1])
return start, end, ''.join(lines[func_line:j + 1])
return None
def inject_into_function(content: str, start: int, end: int, inject_str: str, after_line: int=None):
func = content[start:end]
func_lines = func.splitlines(keepends=True)
if after_line is None:
# Insert after first line (after function header)
func_lines.insert(1, inject_str + '\n')
else:
func_lines.insert(after_line, inject_str + '\n')
new_func = ''.join(func_lines)
return content[:start] + new_func + content[end:]
def modify_function(lua_files: dict[str,str], func_name: Optional[str] = None, target_file: str='', target_line: Optional[int]=None, inject_str: str=''):
if target_file != '':
content = lua_files[target_file]
else:
# Search for name across all files
for f, c in lua_files.items():
if func_name in c:
target_file = f
content = c
break
else:
raise ValueError("Function not found")
match = find_lua_function(content, func_name=func_name, line_number=target_line)
if not match:
raise ValueError("Could not locate function boundaries")
start, end, _ = match
new_content = inject_into_function(content, start, end, inject_str)
return new_content
# ============================================================================
# File I/O Functions (create/load files in wc3 preload format)
# ============================================================================
def load_file(filename: str) -> Optional[bytes]:
"""loads a file in wc3 preload format (saved by FileIO) and parses it"""
if not os.path.exists(filename):
return None
with open(filename, 'rb') as file:
data = file.read()
# First try the i([[ ... ]])--[[ pattern used by create_file/FileIO
matches = re.findall(REGEX_PATTERN, data, flags=re.DOTALL)
if matches:
return b''.join(matches)
# Fallback to simpler pattern for legacy/simple format
matches = re.findall(READ_REGEX_PATTERN, data, flags=re.DOTALL)
if matches:
return b''.join(matches)
return b''
def create_file(filename: str, content) -> None:
"""
creates a file in wc3 preload format (that can be loaded by FileIO)
@content: The data that will be returned from FileIO after loading this file.
Accepts both str and bytes.
"""
assert len(content) > 0
# Convert to bytes if needed for internal processing
if isinstance(content, str):
content_bytes = content.encode('utf-8')
else:
content_bytes = content
# Build the file content as bytes
data = FILE_PREFIX.encode('utf-8')
line_prefix_bytes = LINE_PREFIX.encode('utf-8')
line_postfix_bytes = LINE_POSTFIX.encode('utf-8')
# Split content into 255 byte chunks
for i in range(0, len(content_bytes), 255):
chunk = content_bytes[i : i+255]
data += line_prefix_bytes + chunk + line_postfix_bytes
data += FILE_POSTFIX.encode('utf-8')
with open(filename, 'wb') as file:
file.write(data)
def remove_all_files() -> None:
"""Removes all input and output files from the FILES_ROOT directory.
If we leave the files there, the next game might read and run them"""
if not os.path.isdir(FILES_ROOT):
return
for filename in os.listdir(FILES_ROOT):
file_path = os.path.join(FILES_ROOT, filename)
# Remove regular files and breakpoint files
if filename.endswith(".txt") and os.path.isfile(file_path):
# Match: in*.txt, out.txt, bp_in.txt, bp_out.txt, bp_threads.txt, bp_data_*.txt
if (filename.startswith("in") or filename == "out.txt" or
filename.startswith("bp_")):
try:
os.unlink(file_path)
except Exception as e:
print(f"Error deleting file {file_path}: {e}")
def parse_nonloadable_file(filename: str) -> Optional[bytes]:
"""Read payload from files saved via FileIO.Save(..., isLoadable=False) in the test environment.
In WC3, these files go through the Preload mechanism and are wrapped in a standard format:
function PreloadFiles takes nothing returns nothing
call PreloadStart()
call Preload( "payload_chunk1" )
call Preload( "payload_chunk2" )
call PreloadEnd( 0.0 )
endfunction
This function parses the preload wrapper and extracts the concatenated payload from all
Preload() calls.
This function is used for breakpoint metadata files (bp_threads.txt, bp_data_*.txt, bp_out.txt)
which are saved with isLoadable=False by LiveCoding.lua.
"""
if not os.path.exists(filename):
return None
with open(filename, 'rb') as file:
content = file.read()
# Parse the preload wrapper and extract payload from call Preload( "..." ) lines
# Pattern matches: call Preload( "..." ) and allows double quotes inside the payload
# The pattern ((?:""|[^"])*) matches either doubled quotes "" or non-quote chars,
# stopping only at the closing " ) sequence
pattern = rb'call Preload\( "((?:""|[^"])*)" \)'
matches = re.findall(pattern, content)
if matches:
return b''.join(matches)
# Check if this looks like a preload wrapper (has PreloadStart/PreloadEnd but no Preload calls)
# This happens when FileIO.Save is called with empty data
assert b'call PreloadStart()' in content and b'call PreloadEnd(' in content
return None
# ============================================================================
# LiveCode output parsers
# ============================================================================
def parse_indexed_output(content: bytes) -> Tuple[Optional[bytes], Optional[bytes]]:
"""Parse output file with format 'index FIELD_SEP result'. Returns (index, result)."""
if not content:
return None, None
lines = content.split(FIELD_SEP, 1)
if len(lines) >= 2:
assert len(lines) == 2, f"unexpected file format - too many separators {content}"
return lines[0], lines[1]
elif len(lines) == 1:
return lines[0], b''
return None, None
def parse_bp_threads_file() -> List[bytes]:
"""Get list of thread IDs currently in a breakpoint by reading bp_threads.txt."""
bp_threads_file = os.path.join(FILES_ROOT, "bp_threads.txt")
if not os.path.exists(bp_threads_file):
return []
content = parse_nonloadable_file(bp_threads_file)
if not content or not content.strip():
return []
return [t.strip() for t in content.split(FIELD_SEP) if t.strip()]
def parse_bp_data_file(thread_id: str) -> Optional[Dict[str, any]]:
"""Get breakpoint info for a specific thread by reading bp_data_<thread_id>.txt.
File format is a single FIELD_SEP-separated record:
bp_id<SEP>value<SEP>stack<SEP>stacktrace<SEP>var1<SEP>val1<SEP>var2<SEP>val2...
Fields are parsed in pairs: key, value, key, value, ...
- bp_id: breakpoint identifier
- stack: stacktrace
- other keys: local variable names with their values
Returns a dict with keys: bp_id, stack, locals_values (dict of name->value).
The 'locals' list is derived from locals_values keys.
"""
bp_data_file = os.path.join(FILES_ROOT, f"bp_data_{thread_id}.txt")
if not os.path.exists(bp_data_file):
return None
content = parse_nonloadable_file(bp_data_file)
if not content:
return None
text = content.strip()
if not text:
return None
# Split by FIELD_SEP and parse in pairs
parts = text.split(FIELD_SEP)
info = {'thread_id': thread_id, 'locals_values': {}}
i = 0
while i + 1 < len(parts):
key, value = parts[i], parts[i + 1]
i += 2
if key == b'bp_id':
info['bp_id'] = value
elif key == b'stack':
# Unescape newlines in stacktrace
info['stack'] = value
else:
# Local variable value
info['locals_values'][key] = value
# Derive locals list from locals_values keys
info['locals'] = list(info['locals_values'].keys())
return info
# ============================================================================
# Breakpoint Related Functions
# ============================================================================
def bp_show_info(thread_id: str):
"""Show detailed breakpoint info for a specific thread."""
info = parse_bp_data_file(thread_id)
if not info:
print(f"No breakpoint info found for thread '{thread_id}'")
return
print(f"Thread: {thread_id}")
print(f"Breakpoint ID: {info.get('bp_id', 'unknown')}")
locals_list = info.get('locals', [])
if locals_list:
print(f"Local variables: {b', '.join(locals_list)}")
for var in locals_list:
if var in info.get('locals_values', {}):
print(f" {var} = {info['locals_values'][var]}")
stack = info.get('stack', '')
if stack:
print(f"Stack trace:\n{stack}")
def check_for_new_breakpoints() -> None:
"""Check for new breakpoints and handle them on the main thread.
This function is called from the main event loop to poll for new breakpoints.
Since it runs on the main thread, no locks are needed.
"""
for thread_id_bytes in parse_bp_threads_file():
thread_id = thread_id_bytes.decode('utf-8', errors='replace')
info = parse_bp_data_file(thread_id)
if not info:
continue
if thread_state_is_bp.get(thread_id, False):
# Thread is already in a breakpoint (we haven't continued yet)
continue
# Mark thread as in breakpoint and handle the event
thread_state_is_bp[thread_id] = True
handle_breakpoint_event(thread_id, info)
def stdin_reader_thread_func() -> None:
"""Background thread that reads stdin and pushes to queue.
Uses blocking input() since this runs in a separate thread.
"""
while not stdin_reader_stop_event.is_set():
try:
line = input()
stdin_queue.put(line)
except EOFError:
break
except Exception as e:
# If stdin is closed or there's an error, exit the thread
print("Stdin reader thread exiting due to error.", e)
break
def start_stdin_reader() -> None:
"""Start the background stdin reader thread."""
global stdin_reader_thread
if stdin_reader_thread is not None and stdin_reader_thread.is_alive():
return
stdin_reader_stop_event.clear()
stdin_reader_thread = threading.Thread(target=stdin_reader_thread_func, daemon=True)
stdin_reader_thread.start()
def stop_stdin_reader() -> None:
"""Stop the background stdin reader thread."""
global stdin_reader_thread
stdin_reader_stop_event.set()
if stdin_reader_thread is not None:
stdin_reader_thread.join(timeout=1.0)
stdin_reader_thread = None
def print_breakpoint_hit(thread_id: str, info: Dict[str, any]) -> None:
"""Print a message when a breakpoint is hit.
Uses flush=True to ensure immediate output when called from background thread.
"""
bp_id = info.get('bp_id', 'unknown')
locals_list = info.get('locals', [])
print("\n" + "=" * 60)
print(f"BREAKPOINT HIT: {bp_id}")
print(f"Thread: {thread_id}")
if locals_list:
print(f"Local variables: {b', '.join(locals_list)}")
print("Type 'help' for commands, 'continue' to resume, or enter Lua code")
print("=" * 60)
print(f"{nextFile} >>> ", flush=True, end='')
def get_prompt()-> str:
"""Get the appropriate prompt based on current context.
No lock needed - only called from main thread.
"""
if current_breakpoint is not None:
thread_id = current_breakpoint[0]
short_id = thread_id[:8] if len(thread_id) > 8 else thread_id
return f"bp:{short_id}... >>> "
else:
return f"{nextFile} >>> "
def clear_state():
"""Clean up all state and stop background threads."""
stop_stdin_reader()
file_watcher.stop_all_watchers()
remove_all_files()
def signal_handler(sig: int, frame) -> None:
"""On any termination of the program we want to remove the input and output files and stop watchers"""
clear_state()
sys.exit(0)
nextFile: int = 0
def send_data_to_game(data: str, print_prompt_after: bool = False) -> Optional[str]:
"""Send data to the game and wait for response.
This is the unified interface for both normal and breakpoint modes.
- In normal mode: Uses in{N}.txt and out.txt with format "{index}FIELD_SEP{result}"
- In breakpoint mode: Uses bp_in_{thread_id}_{idx}.txt and bp_out.txt with format "{thread_id}:{cmd_index}FIELD_SEP{result}"
No locks needed - only called from main thread.
Args:
data: The Lua code to send to the game
print_prompt_after: If True, print the prompt after the result (used for file/watch commands)
Returns:
The result string from the game, or None if no response/timeout
"""
global nextFile, bp_command_indices
if data == "":
return None
# Check if we're in breakpoint context
in_breakpoint = current_breakpoint is not None
if in_breakpoint:
thread_id = current_breakpoint[0]
cmd_index = bp_command_indices.get(thread_id, 0)
bp_command_indices[thread_id] = cmd_index + 1
expected_prefix = f"{thread_id}:{cmd_index}"
# Breakpoint mode: use bp_in/bp_out files
in_file = os.path.join(FILES_ROOT, f"bp_in_{thread_id}_{cmd_index}.txt")
out_file = os.path.join(FILES_ROOT, "bp_out.txt")
else:
thread_id = ""
cmd_index = nextFile
nextFile += 1
expected_prefix = f"{cmd_index}"
# Normal mode: use in/out files
in_file = os.path.join(FILES_ROOT, f"in{cmd_index}.txt")
out_file = os.path.join(FILES_ROOT, "out.txt")
create_file(in_file, data)
debug = os.environ.get('WC3_E2E_DEBUG')
if debug:
print(f"[DEBUG] send_data_to_game thread_id={thread_id}, cmd_index={cmd_index}. Wrote command to: {in_file}. Waiting for response with prefix: {expected_prefix}")
start_time = time.time()
timeout = 20
while time.time() - start_time < timeout:
if os.path.exists(out_file):
content = parse_nonloadable_file(out_file)
if content:
index, result = parse_indexed_output(content)
if debug:
print(f"[DEBUG] {out_file} content (first 100 bytes): {content[:100]}. Parsed index: {index}, expected: {expected_prefix}")
if index and index.decode('utf-8', errors='replace') == expected_prefix:
if debug:
print(f"[DEBUG] Got matching response!")
result_str = result.decode('utf-8', errors='replace') if result else None
if result_str and result_str != "nil":
print(result_str)
if print_prompt_after:
print(get_prompt(), end="", flush=True)
return result_str
time.sleep(0.1)
if debug:
print(f"[DEBUG] TIMEOUT waiting for {expected_prefix}. {out_file} exists: {os.path.exists(out_file)}")
if os.path.exists(out_file):
content = parse_nonloadable_file(out_file)
print(f"[DEBUG] Final {out_file} content: {content}")
print(f"Timeout waiting for response")
if print_prompt_after:
print(get_prompt(), end="", flush=True)
return None
def wrap_with_oninit_immediate(content: str) -> str:
"""Wraps Lua content with OnInit immediate execution wrapper"""
return ONINIT_IMMEDIATE_WRAPPER + content + ONINIT_IMMEDIATE_WRAPPER_END
def send_file_to_game(filepath: str) -> None:
"""Send a file to the game with OnInit wrapper applied. Used by both 'file' command and watch callbacks."""
if not os.path.exists(filepath):
print(f"Error: File does not exist: {filepath}")
return
with open(filepath, 'r', encoding='utf-8') as f:
data = f.read()
# Wrap with OnInit immediate execution wrapper
data = wrap_with_oninit_immediate(data)
print(f"Sending file {filepath} to game")
send_data_to_game(data, print_prompt_after=True)
def handle_continue_command() -> None:
"""Handle the 'continue' command to resume execution of current breakpoint thread.
No locks needed - only called from main thread.
"""
global current_breakpoint, pending_breakpoints
if current_breakpoint is None:
print("Not in a breakpoint context.")
return
thread_id = current_breakpoint[0]
# Send continue command to game via unified interface
send_data_to_game("continue")
print(f"Resuming thread {thread_id}...")
# Clean up state for this thread
if thread_id in thread_state_is_bp:
del thread_state_is_bp[thread_id]
# Move to next pending breakpoint if any
if pending_breakpoints:
next_bp = pending_breakpoints.pop(0)
current_breakpoint = next_bp
print_breakpoint_hit(next_bp[0], next_bp[1])
if pending_breakpoints:
print(f"[{len(pending_breakpoints)} more breakpoint(s) pending]")
else:
# No more breakpoints - clear current context
current_breakpoint = None
print("[Returned to normal command mode]")
def handle_thread_command(new_thread_id: str) -> None:
"""Handle the 'thread <id>' command to switch to a different breakpoint thread.
No locks needed - only called from main thread.
"""
global current_breakpoint
threads = parse_bp_threads_file()
threads_str = [t.decode('utf-8', errors='replace') for t in threads]
if new_thread_id in threads_str:
new_info = parse_bp_data_file(new_thread_id)
if new_info:
current_breakpoint = (new_thread_id, new_info)
print(f"Switched to thread {new_thread_id}")
print(f"Breakpoint: {new_info.get('bp_id', 'unknown')}")
new_locals = new_info.get('locals', [])
if new_locals:
print(f"Local variables: {b', '.join(new_locals)}")
else:
print(f"Switched to thread {new_thread_id} (no info available)")
current_breakpoint = (new_thread_id, {'thread_id': new_thread_id})
else:
print(f"Thread '{new_thread_id}' not found in breakpoint.")
print(f"Available threads: {', '.join(threads_str) if threads_str else 'none'}")
def handle_command(cmd: str) -> bool:
"""Handle a single command from the user. Returns False to exit."""
global nextFile, bp_command_indices, current_breakpoint, pending_breakpoints
# Check if we're in breakpoint context for context-aware help
in_bp_mode = current_breakpoint is not None
if in_bp_mode:
thread_id = current_breakpoint[0]
splitted = cmd.strip().split()
if len(splitted) == 0:
return True
main_cmd = splitted[0]
args = " ".join(splitted[1:]) # note that these are just general args, they do not apply to all commands
# Unified command handling - same commands work in both modes
if main_cmd == "quit" or main_cmd == "q":
clear_state()
return False
if main_cmd == "help" or main_cmd == "h":
print("Available commands:")
print(" help/h - Show this help message")
print(" quit/q - Exit the program")
print(" restart/r - Cleans the state to allow a new game to be started (this is the same as exiting and restarting the script)")
print(" jump/j <number> - use in case of closing the interpreter (or crashing) while game is still running. Starts sending commands from a specific file index. Should use the index printed in the prompt before the `>>>`")
print(" file <full file path> - send a file with lua commands to the game. end the file with `return <data>` to print the data to the console")
print(" watch <full file path> - watch a file for changes and automatically send it to the game on each update")
print(" unwatch <full file path> - stop watching a file")
print(" watching - list all files currently being watched")
print(" list/l - list all threads currently in a breakpoint (current marked with *)")
print(" thread/t <id> - switch to a different breakpoint thread")
print(" info/i - show detailed info for current breakpoint thread")
print(" continue/c - resume execution of current breakpoint thread")
print(" <lua command> - run a lua command in the game. If the command is a `return` statement, the result will be printed to the console.")
print("** Note: exiting or restarting the script while the game is running will cause it to stop working until the game is also restarted **")
print("** Note: OnInit calls in files sent via 'file' or 'watch' are automatically executed immediately **")
if in_bp_mode:
print(f"\n[Currently in breakpoint context: thread {thread_id}]")
return True
if main_cmd == "restart":
clear_state()
nextFile = 0
bp_command_indices = {}
current_breakpoint = None
pending_breakpoints = []
thread_state_is_bp.clear()
start_stdin_reader()
print("State reset. You can start a new game now.")
return True
if main_cmd == "jump" or main_cmd == "j":
nextFile = args
return True
if main_cmd == "watch":
filepath = args
file_watcher.start_watching(filepath, file_change_queue)
return True
if main_cmd == "unwatch":
filepath = args
file_watcher.stop_watching(filepath)
return True
if main_cmd == "watching":
file_watcher.list_watched_files()
return True
if main_cmd == "file":
filepath = args
send_file_to_game(filepath)
return True
if main_cmd == "list" or main_cmd == "l":
# List all threads in breakpoint (with current marked)
threads = parse_bp_threads_file()
if not threads:
print("No threads currently in a breakpoint.")
else:
print(f"Threads in breakpoint ({len(threads)}):")
for tid in threads:
tid_str = tid.decode('utf-8', errors='replace')
bp_info = parse_bp_data_file(tid_str)
bp_name = bp_info.get('bp_id', 'unknown') if bp_info else 'unknown'
marker = " *" if in_bp_mode and tid_str == thread_id else ""
print(f" {tid_str}: breakpoint '{bp_name}'{marker}")
return True
if main_cmd == "thread" or main_cmd == "t":
new_thread_id = args
handle_thread_command(new_thread_id)
return True
if main_cmd == "info" or main_cmd == "i":
if in_bp_mode:
bp_show_info(thread_id)
else:
print("Not in a breakpoint context. Use 'list' to see available threads.")
return True
if main_cmd == "continue" or main_cmd == "c":
if in_bp_mode:
handle_continue_command()
else:
print("Not in a breakpoint context.")
return True
# Send Lua command to game via unified interface
send_data_to_game(cmd)
return True
def handle_breakpoint_event(thread_id: str, info: Dict[str, any]) -> None:
"""Handle a new breakpoint event.
Called by check_for_new_breakpoints() when a new breakpoint is detected.
"""
global current_breakpoint, pending_breakpoints
# Update state
if current_breakpoint is None:
current_breakpoint = (thread_id, info)
else:
pending_breakpoints.append((thread_id, info))
# Print the BREAKPOINT HIT message
print_breakpoint_hit(thread_id, info)
# If this is a queued breakpoint, add a note about context
if current_breakpoint[0] != thread_id:
print(f"[Note: this breakpoint is queued; current context stays at {current_breakpoint[0][:8]}... until you 'continue']", flush=True)
def handle_file_change_event(filepath: str) -> None:
"""Handle a file change event from the queue.
Called by main thread when file_change_queue has data.
"""
print(f"\n[watch] File changed: {filepath}")
send_file_to_game(filepath)
print(get_prompt(), end="", flush=True)
def main() -> None:
"""Main event loop.
The main thread does all the work:
- Processes user input from stdin_queue (stdin reader thread pushes to queue)
- Processes file change events from file_change_queue (watchdog threads push to queue)
- Polls for new breakpoints directly (no separate thread needed)
No locks needed since breakpoint monitoring runs on the main thread.
"""
global current_breakpoint, pending_breakpoints
remove_all_files()
# add a signal handler that handles all signals by removing all files and calling the default handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGABRT, signal_handler)
signal.signal(signal.SIGSEGV, signal_handler)
signal.signal(signal.SIGILL, signal_handler)
# Start background stdin reader thread
start_stdin_reader()
print(f"Wc3 Interpreter {VERSION}. For help, type `help`.")
print(get_prompt(), end="", flush=True)
while True:
# Check for new breakpoints
check_for_new_breakpoints()
# Check for file changes
while not file_change_queue.empty():
filepath = file_change_queue.get_nowait()
handle_file_change_event(filepath)
# Check for user input (with short timeout to stay responsive)
if not stdin_queue.empty():
command = stdin_queue.get()
cmd = command.strip()
if cmd == "":
print(get_prompt(), end="", flush=True)
continue
if not handle_command(cmd):
break
print(get_prompt(), end="", flush=True)
if __name__ == "__main__":
main()
if Debug then Debug.beginFile("FileIO") end
--[[
Optimized FileIO v1.0.0 by Tomotz
Based on Trokkin's version (https://www.hiveworkshop.com/threads/fileio-lua-optimized.347049/)
- Read and write files with any data.
- This version allows writing/reading any data including characters that the preload natives can't work with, by escaping them.
- Allow Saving with the `-isLoadable` that will make the file look nicer if you never plan to load it.
It's similar to Antares's Stable Lua FileIO, only it allows writing null terminators as well.
API:
FileIO.Save(filename, data, isLoadable?)
- Write string data to a file
---@param isLoadable boolean? -- Use false only if you never plan to load the file to make it format nicer and be more size efficient. Default is true.
FileIO.Load(filename) -> string?
- Read string data from a file. Returns nil if file doesn't exist.
FileIO.SaveAsserted(filename, data, onFail?) -> bool
- Saves the file and checks that it was saved successfully.
If it fails, passes (filename, data, loadResult) to onFail.
FileIO.enabled : bool
- field that indicates that files can be accessed correctly.
Requirements:
StringEscape by Tomotz @ https://www.hiveworkshop.com/threads/optimized-syncstream-and-stringescape.367925/
Total Initialization by Bribe @ https://www.hiveworkshop.com/threads/317099/
Optional requirements:
DebugUtils by Eikonium @ https://www.hiveworkshop.com/threads/330758/
Patch by Tomotz 1 Mar 2025:
- Added escaping to all characters unsupported by FileIO. Those include null terminator (for saving and loading),
and line feed, backslash, closing square bracket.
- Added optional parameter isLoadable to savefile, which defaults to true. If false, the file will have nicer format -
less character are escaped, and there are less extra characters added by FileIO. This also means there is more room for user
data in each Preload call. Such files will not be loadable with FileIO.Load.
--]]
local RAW_PREFIX = ']]i([['
local RAW_SUFFIX = ']])--[['
local MAX_PRELOAD_SIZE = 256
MAX_TEXT_SAVE = MAX_PRELOAD_SIZE
MAX_TEXT_LOAD = MAX_PRELOAD_SIZE - #RAW_PREFIX - #RAW_SUFFIX
local LOAD_ABILITY = FourCC('ANdc')
local LOAD_EMPTY_KEY = '!@#$, empty data'
local name = nil ---@type string?
-- carriage return seems to turn into new line when written and read back
FileIO_unsupportedLoadChars = {0, 10, 13, 92, 93} -- null terminator, line feed, carriage return, slash, closing square bracket
FileIO_unsupportedSaveChars = {0} -- only null terminator is not supported when saving. \ becomed \\ though, so the string becomes longer and we lose the last characters in the Preload
---@param filename any
---@param isLoadable any
local function open(filename, isLoadable)
-- turns out you can't save a file without an extension.
if Debug then Debug.assert(filename:find('.', 1, true), "FileIO: filename must have an extension") end
name = filename
PreloadGenClear()
if isLoadable then
Preload('")\nendfunction\n//!beginusercode\nlocal p={} local i=function(s) table.insert(p,s) end--[[')
end
end
local function write(s, isLoadable)
local maxSize = isLoadable and MAX_TEXT_LOAD or MAX_TEXT_SAVE
local prefix = isLoadable and RAW_PREFIX or ''
local suffix = isLoadable and RAW_SUFFIX or ''
local curPos = 1
while curPos <= #s do
local chunk = s:sub(curPos, curPos + maxSize - 1)
local lastChar = #chunk
if not isLoadable then
-- handle \ characters which are escaped as \\ in preload (for loadable files, we replace the \ with unprintable char)
local _, numSlash = chunk:gsub("[\\]", "")
local curLen = lastChar + numSlash -- This is the actuall length the chunk will take in preload
while curLen > maxSize and lastChar > 0 do
local char = chunk:sub(lastChar, lastChar)
if char == '\\' then
curLen = curLen - 1
end
curLen = curLen - 1
lastChar = lastChar - 1
end
end
chunk = chunk:sub(1, lastChar)
Preload(prefix .. chunk .. suffix)
curPos = curPos + #chunk
end
end
local function close(isLoadable)
if isLoadable then
Preload(']]BlzSetAbilityTooltip(' ..
LOAD_ABILITY .. ', table.concat(p), 0)\n//!endusercode\nfunction a takes nothing returns nothing\n//')
end
PreloadGenEnd(name --[[@as string]])
name = nil
end
---@param filename string
---@param data string
---@param isLoadable boolean? -- Use false only if you never plan to load the file. Default is true.
-- This controls which characters are escaped and replaced. For loadable files, we must remove more characters.
local function savefile(filename, data, isLoadable)
if isLoadable == nil then
isLoadable = true
end
local unsupportedChars ---@type integer[]
if isLoadable then
unsupportedChars = FileIO_unsupportedLoadChars
else
unsupportedChars = FileIO_unsupportedSaveChars
end
local data2 = AddEscaping(data, unsupportedChars)
open(filename, isLoadable)
write(data2, isLoadable)
close(isLoadable)
end
---@param filename string
---@return string?
local function loadfile(filename)
local s = BlzGetAbilityTooltip(LOAD_ABILITY, 0)
BlzSetAbilityTooltip(LOAD_ABILITY, LOAD_EMPTY_KEY, 0)
Preloader(filename)
local loaded = BlzGetAbilityTooltip(LOAD_ABILITY, 0)
if loaded == LOAD_EMPTY_KEY then
return nil
end
return RemoveEscaping(loaded, FileIO_unsupportedLoadChars)
end
---@param filename string
---@param data string
---@param onFail nil | fun(filename:string, data:string, loadResult:string?)
---@return boolean
local function saveAsserted(filename, data, onFail)
savefile(filename, data)
local res = loadfile(filename)
if res == data then
return true
end
if onFail then
onFail(filename, data, res)
end
return false
end
FileIO = {
Save = savefile,
Load = loadfile,
SaveAsserted = saveAsserted,
}
OnInit.global(function()
FileIO.enabled = saveAsserted('TestFileIO.pld', 'FileIO is Enabled')
end)
if Debug then Debug.endFile() end