• 🏆 Texturing Contest #33 is OPEN! Contestants must re-texture a SD unit model found in-game (Warcraft 3 Classic), recreating the unit into a peaceful NPC version. 🔗Click here to enter!
  • It's time for the first HD Modeling Contest of 2024. Join the theme discussion for Hive's HD Modeling Contest #6! Click here to post your idea!

[Lua] Task Processor

Level 6
Joined
Jun 30, 2017
Messages
41
Inspired by Chopinski's Relativistic Missile System way of processing missiles, I've come up with an idea of mimicking OS's process scheduling inside Lua's runtime for the purpose of eliminating "processing" lag by sacrificing the fact that the code will always execute immediately upon call.

At the moment it doesn't look as impressive as it sounds. There's no complicated scheduling strategy or task priority implemented. For now I've opted for Javascript's style of asynchronous code handling similar to Promises, by using Rx.lua.

Lua:
-- RxLua v0.0.3
-- https://github.com/bjornbytes/rxlua
-- MIT License
-- changed annotations to emmy script and potentially fixed a few stuffs.

do
local util = {}

util.pack = table.pack or function(...) return { n = select('#', ...), ... } end
---@diagnostic disable-next-line: deprecated
util.unpack = table.unpack or unpack
util.eq = function(x, y) return x == y end
util.noop = function() end
util.identity = function(x) return x end
util.constant = function(x) return function() return x end end
util.isa = function(object, class)
  return type(object) == 'table' and getmetatable(object).__index == class
end
util.tryWithObserver = function(observer, fn, ...)
  local success, result = pcall(fn, ...)
  if not success then
    observer:onError(result)
  end
  return success, result
end

--- @class Subscription
--- @field action function
--- @field unsubscribed boolean
-- A handle representing the link between an Observer and an Observable, as well as any
-- work required to clean up after the Observable completes or the Observer unsubscribes.
Subscription = {}
Subscription.__index = Subscription
Subscription.__tostring = util.constant('Subscription')

-- Creates a new Subscription.
---@param action fun(subscription: Subscription) action - The action to run when the subscription is unsubscribed. It will only be run once.
---@return Subscription
function Subscription.create(action)
  local self = {
    action = action or util.noop,
    unsubscribed = false
  }

  return setmetatable(self, Subscription)
end

--- Unsubscribes the subscription, performing any necessary cleanup work.
function Subscription:unsubscribe()
  if self.unsubscribed then return end
  self.action(self)
  self.unsubscribed = true
end

--- @class Observer
--- @field _onNext fun(values: ...)
--- @field _onError fun(message: any, level: integer?)
--- @field _onCompleted fun()
--- @field stopped boolean
-- Observers are simple objects that receive values from Observables.
Observer = {}
Observer.__index = Observer
Observer.__tostring = util.constant('Observer')

--- Creates a new Observer.
--- @param onNext fun(...)? Called when the Observable produces a value.
--- @param onError fun(message: string)? Called when the Observable terminates due to an error.
--- @param onCompleted fun()? Called when the Observable completes normally.
--- @return Observer
function Observer.create(onNext, onError, onCompleted)
  local self = {
    _onNext = onNext or util.noop,
    _onError = onError or error,
    _onCompleted = onCompleted or util.noop,
    stopped = false
  }

  return setmetatable(self, Observer)
end

--- Pushes zero or more values to the Observer.
--- @param ... any
function Observer:onNext(...)
  if not self.stopped then
    self._onNext(...)
  end
end

--- Notify the Observer that an error has occurred.
--- @param message string? A string describing what went wrong.
function Observer:onError(message)
  if not self.stopped then
    self.stopped = true
    self._onError(message)
  end
end

--- Notify the Observer that the sequence has completed and will produce no more values.
function Observer:onCompleted()
  if not self.stopped then
    self.stopped = true
    self._onCompleted()
  end
end

--- @class Observable
--- @field _subscribe function
-- Observables push values to Observers.
Observable = {}
Observable.__index = Observable
Observable.__tostring = util.constant('Observable')

--- Creates a new Observable.
--- @param subscribe fun(observer: Observer): any? The subscription function that produces values.
--- @return Observable
function Observable.create(subscribe)
  local self = {
    _subscribe = subscribe
  }

  return setmetatable(self, Observable)
end

--- Shorthand for creating an Observer and passing it to this Observable's subscription function.
--- @param onNext table|fun(...)? Called when the Observable produces a value.
--- @param onError fun(message: string)? Called when the Observable terminates due to an error.
--- @param onCompleted fun()? Called when the Observable completes normally.
function Observable:subscribe(onNext, onError, onCompleted)
  if type(onNext) == 'table' then
    return self._subscribe(onNext)
  else
    return self._subscribe(Observer.create(onNext, onError, onCompleted))
  end
end

--- Returns an Observable that immediately completes without producing a value.
function Observable.empty()
  ---@param observer Observer
  return Observable.create(function(observer)
    observer:onCompleted()
  end)
end

--- Returns an Observable that never produces values and never completes.
function Observable.never()
  return Observable.create(function(observer) end)
end

--- Returns an Observable that immediately produces an error.
--- @param message string
function Observable.throw(message)
    --- @param observer Observer
  return Observable.create(function(observer)
    observer:onError(message)
  end)
end

--- Creates an Observable that produces a set of values.
--- @param ... any
--- @return Observable
function Observable.of(...)
  local args = {...}
  local argCount = select('#', ...)
  return Observable.create(function(observer)
    for i = 1, argCount do
      observer:onNext(args[i])
    end

    observer:onCompleted()
  end)
end

--- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
--- @param initial integer The first value of the range, or the upper limit if no other arguments are specified.
--- @param limit integer The second value of the range.
--- @param step integer An amount to increment the value by each iteration.
--- @return Observable
function Observable.fromRange(initial, limit, step)
  if not limit and not step then
    initial, limit = 1, initial
  end

  step = step or 1

  return Observable.create(function(observer)
    for i = initial, limit, step do
      observer:onNext(i)
    end

    observer:onCompleted()
  end)
end

--- Creates an Observable that produces values from a table.
--- @generic K, V
--- @param t table The table used to create the Observable.
--- @param iterator fun(table: table<K, V>, index?: K):K, V An iterator used to iterate the table, e.g. pairs or ipairs.
--- @param keys boolean Whether or not to also emit the keys of the table.
--- @return Observable
function Observable.fromTable(t, iterator, keys)
  iterator = iterator or pairs
  ---@param observer Observer
  return Observable.create(function(observer)
    for key, value in iterator(t) do
      observer:onNext(value, keys and key or nil)
    end

    observer:onCompleted()
  end)
end

--- Creates an Observable that produces values when the specified coroutine yields.
--- @param fn thread|function A coroutine or function to use to generate values.  Note that if a coroutine is used, the values it yields will be shared by all subscribed Observers (influenced by the Scheduler), whereas a new coroutine will be created for each Observer when a function is used.
--- @return Observable
function Observable.fromCoroutine(fn, scheduler)
  return Observable.create(function(observer)
    local thread = type(fn) == 'function' and coroutine.create(fn) or fn
    return scheduler:schedule(function()
      while not observer.stopped do
        local success, value = coroutine.resume(thread --[[@as thread]])

        if success then
          observer:onNext(value)
        else
          return observer:onError(value)
        end

        if coroutine.status(thread --[[@as thread]]) == 'dead' then
          return observer:onCompleted()
        end

        coroutine.yield()
      end
    end)
  end)
end

-- this is banned!
----- Creates an Observable that produces values from a file, line by line.
----- @param filename string The name of the file used to create the Observable
----- @return Observable
-- function Observable.fromFileByLine(filename)
--   return Observable.create(function(observer)
--     local file = io.open(filename, 'r')
--     if file then
--       file:close()

--       for line in io.lines(filename) do
--         observer:onNext(line)
--       end

--       return observer:onCompleted()
--     else
--       return observer:onError(filename)
--     end
--   end)
-- end

--- Creates an Observable that creates a new Observable for each observer using a factory function.
--- @param fn fun():Observable A function that returns an Observable.
--- @return Observable
function Observable.defer(fn)
  if not fn or type(fn) ~= 'function' then
    error('Expected a function')
  end

  return setmetatable({
    subscribe = function(_, ...)
      local observable = fn()
      return observable:subscribe(...)
    end
  }, Observable)
end

--- Returns an Observable that repeats a value a specified number of times.
--- @param value any - The value to repeat.
--- @param count number - The number of times to repeat the value.  If left unspecified, the value is repeated an infinite number of times.
--- @return Observable
function Observable.replicate(value, count)
  return Observable.create(function(observer)
    while count == nil or count > 0 do
      observer:onNext(value)
      if count then
        count = count - 1
      end
    end
    observer:onCompleted()
  end)
end

--- Subscribes to this Observable and prints values it produces.
--- @param name string - Prefixes the printed messages with a name.
--- @param formatter function - A function that formats one or more values to be printed. Default is toString
function Observable:dump(name, formatter)
  name = name and (name .. ' ') or ''
  formatter = formatter or tostring

  local onNext = function(...) print(name .. 'onNext: ' .. formatter(...)) end
  local onError = function(e) print(name .. 'onError: ' .. e) end
  local onCompleted = function() print(name .. 'onCompleted') end

  return self:subscribe(onNext, onError, onCompleted)
end

--- Determine whether all items emitted by an Observable meet some criteria.
--- @param predicate fun(x: any): any - The predicate used to evaluate objects. Default is util.identity
function Observable:all(predicate)
  predicate = predicate or util.identity

  return Observable.create(function(observer)
    local function onNext(...)
      util.tryWithObserver(observer, function(...)
        if not predicate(...) then
          observer:onNext(false)
          observer:onCompleted()
        end
      end, ...)
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      observer:onNext(true)
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Given a set of Observables, produces values from only the first one to produce a value.
---@param a Observable
---@param b Observable
---@param ... Observable
---@return Observable
function Observable.amb(a, b, ...)
  if not a or not b then return a end

  return Observable.create(function(observer)
    local subscriptionA, subscriptionB

    local function onNextA(...)
      if subscriptionB then subscriptionB:unsubscribe() end
      observer:onNext(...)
    end

    local function onErrorA(e)
      if subscriptionB then subscriptionB:unsubscribe() end
      observer:onError(e)
    end

    local function onCompletedA()
      if subscriptionB then subscriptionB:unsubscribe() end
      observer:onCompleted()
    end

    local function onNextB(...)
      if subscriptionA then subscriptionA:unsubscribe() end
      observer:onNext(...)
    end

    local function onErrorB(e)
      if subscriptionA then subscriptionA:unsubscribe() end
      observer:onError(e)
    end

    local function onCompletedB()
      if subscriptionA then subscriptionA:unsubscribe() end
      observer:onCompleted()
    end

    subscriptionA = a:subscribe(onNextA, onErrorA, onCompletedA)
    subscriptionB = b:subscribe(onNextB, onErrorB, onCompletedB)

    return Subscription.create(function()
      subscriptionA:unsubscribe()
      subscriptionB:unsubscribe()
    end)
  end):amb(...)
end

--- Returns an Observable that produces the average of all values produced by the original.
--- @return Observable
function Observable:average()
  return Observable.create(function(observer)
    local sum, count = 0, 0

    local function onNext(value)
      sum = sum + value
      count = count + 1
    end

    local function onError(e)
      observer:onError(e)
    end

    local function onCompleted()
      if count > 0 then
        observer:onNext(sum / count)
      end

      observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns an Observable that buffers values from the original and produces them as multiple
-- values.
--- @param size number - The size of the buffer.
function Observable:buffer(size)
  if not size or type(size) ~= 'number' then
    error('Expected a number')
  end

  return Observable.create(function(observer)
    local buffer = {}

    local function emit()
      if #buffer > 0 then
        observer:onNext(util.unpack(buffer))
        buffer = {}
      end
    end

    local function onNext(...)
      local values = {...}
      for i = 1, #values do
        table.insert(buffer, values[i])
        if #buffer >= size then
          emit()
        end
      end
    end

    local function onError(message)
      emit()
      return observer:onError(message)
    end

    local function onCompleted()
      emit()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns an Observable that intercepts any errors from the previous and replace them with values
-- produced by a new Observable.
--- @param handler function|Observable - An Observable or a function that returns an Observable to replace the source Observable in the event of an error.
--- @return Observable
function Observable:catch(handler)
  handler = handler and (type(handler) == 'function' and handler or util.constant(handler))

  return Observable.create(function(observer)
    local subscription

    local function onNext(...)
      return observer:onNext(...)
    end

    local function onError(e)
      if not handler then
        return observer:onCompleted()
      end

      local success, _continue = pcall(handler, e)
      if success and _continue then
        if subscription then subscription:unsubscribe() end
        _continue:subscribe(observer)
      else
        observer:onError(success and e or _continue)
      end
    end

    local function onCompleted()
      observer:onCompleted()
    end

    subscription = self:subscribe(onNext, onError, onCompleted)
    return subscription
  end)
end

--- Returns a new Observable that runs a combinator function on the most recent values from a set
-- of Observables whenever any of them produce a new value. The results of the combinator function
-- are produced by the new Observable.
--- @param ... Observable - One or more Observables to combine.
--- @param ... function - A function that combines the latest result from each Observable and returns a single value.
--- @return Observable
function Observable:combineLatest(...)
  local sources = {...}
  local combinator = table.remove(sources)
  if type(combinator) ~= 'function' then
    table.insert(sources, combinator)
    combinator = function(...) return ... end
  end
  table.insert(sources, 1, self)

  return Observable.create(function(observer)
    local latest = {}
    local pending = {util.unpack(sources)}
    local completed = {}
    local subscription = {}

    local function onNext(i)
      return function(value)
        latest[i] = value
        pending[i] = nil

        if not next(pending) then
          util.tryWithObserver(observer, function()
            observer:onNext(combinator(util.unpack(latest)))
          end)
        end
      end
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted(i)
      return function()
        table.insert(completed, i)

        if #completed == #sources then
          observer:onCompleted()
        end
      end
    end

    for i = 1, #sources do
      subscription[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i))
    end

    return Subscription.create(function ()
      for i = 1, #sources do
        if subscription[i] then subscription[i]:unsubscribe() end
      end
    end)
  end)
end

--- Returns a new Observable that produces the values of the first with falsy values removed.
--- @return Observable
function Observable:compact()
  return self:filter(util.identity)
end

--- Returns a new Observable that produces the values produced by all the specified Observables in
-- the order they are specified.
--- @param other Observable - Observable to concatenate.
--- @param ... Observable - The Observables to concatenate.
--- @return Observable
function Observable:concat(other, ...)
  if not other then return self end

  local others = {...}

  return Observable.create(function(observer)
    local function onNext(...)
      return observer:onNext(...)
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    local function chain()
      return other:concat(util.unpack(others)):subscribe(onNext, onError, onCompleted)
    end

    return self:subscribe(onNext, onError, chain)
  end)
end

--- Returns a new Observable that produces a single boolean value representing whether or not the
-- specified value was produced by the original.
--- @param value any - The value to search for.  == is used for equality testing.
--- @return Observable
function Observable:contains(value)
  return Observable.create(function(observer)
    local subscription

    local function onNext(...)
      local args = util.pack(...)

      if #args == 0 and value == nil then
        observer:onNext(true)
        if subscription then subscription:unsubscribe() end
        return observer:onCompleted()
      end

      for i = 1, #args do
        if args[i] == value then
          observer:onNext(true)
          if subscription then subscription:unsubscribe() end
          return observer:onCompleted()
        end
      end
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      observer:onNext(false)
      return observer:onCompleted()
    end

    subscription = self:subscribe(onNext, onError, onCompleted)
    return subscription
  end)
end

--- Returns an Observable that produces a single value representing the number of values produced
-- by the source value that satisfy an optional predicate.
--- @param predicate function - The predicate used to match values.
function Observable:count(predicate)
  predicate = predicate or util.constant(true)

  return Observable.create(function(observer)
    local count = 0

    local function onNext(...)
      util.tryWithObserver(observer, function(...)
        if predicate(...) then
          count = count + 1
        end
      end, ...)
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      observer:onNext(count)
      observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new throttled Observable that waits to produce values until a timeout has expired, at
-- which point it produces the latest value from the source Observable.  Whenever the source
-- Observable produces a value, the timeout is reset.
--- @param time number - An amount in milliseconds to wait before producing the last value.
--- @param scheduler Scheduler - The scheduler to run the Observable on.
--- @return Observable
function Observable:debounce(time, scheduler)
  time = time or 0

  return Observable.create(function(observer)
    local debounced = {}

    local function wrap(key)
      return function(...)
        local value = util.pack(...)

        if debounced[key] then
          debounced[key]:unsubscribe()
        end

        local values = util.pack(...)

        debounced[key] = scheduler:schedule(function()
          return observer[key](observer, util.unpack(values))
        end, time)
      end
    end

    local subscription = self:subscribe(wrap('onNext'), wrap('onError'), wrap('onCompleted'))

    return Subscription.create(function()
      if subscription then subscription:unsubscribe() end
      for _, timeout in pairs(debounced) do
        timeout:unsubscribe()
      end
    end)
  end)
end

--- Returns a new Observable that produces a default set of items if the source Observable produces
-- no values.
--- @param ... any - Zero or more values to produce if the source completes without emitting anything.
--- @return Observable
function Observable:defaultIfEmpty(...)
  local defaults = util.pack(...)

  return Observable.create(function(observer)
    local hasValue = false

    local function onNext(...)
      hasValue = true
      observer:onNext(...)
    end

    local function onError(e)
      observer:onError(e)
    end

    local function onCompleted()
      if not hasValue then
        observer:onNext(util.unpack(defaults))
      end

      observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that produces the values of the original delayed by a time period.
--- @param time number|function - An amount in milliseconds to delay by, or a function which returns this value.
--- @param scheduler Scheduler - The scheduler to run the Observable on.
--- @return Observable
function Observable:delay(time, scheduler)
  time = type(time) ~= 'function' and util.constant(time) or time

  return Observable.create(function(observer)
    local actions = {}

    local function delay(key)
      return function(...)
        local arg = util.pack(...)
        local handle = scheduler:schedule(function()
          observer[key](observer, util.unpack(arg))
        end, time())
        table.insert(actions, handle)
      end
    end

    local subscription = self:subscribe(delay('onNext'), delay('onError'), delay('onCompleted'))

    return Subscription.create(function()
      if subscription then subscription:unsubscribe() end
      for i = 1, #actions do
        actions[i]:unsubscribe()
      end
    end)
  end)
end

--- Returns a new Observable that produces the values from the original with duplicates removed.
--- @return Observable
function Observable:distinct()
  return Observable.create(function(observer)
    local values = {}

    local function onNext(x)
      if not values[x] then
        observer:onNext(x)
      end

      values[x] = true
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns an Observable that only produces values from the original if they are different from
-- the previous value.
--- @param comparator function - A function used to compare 2 values. If unspecified, == is used.
--- @return Observable
function Observable:distinctUntilChanged(comparator)
  comparator = comparator or util.eq

  return Observable.create(function(observer)
    local first = true
    local currentValue = nil

    local function onNext(value, ...)
      local values = util.pack(...)
      util.tryWithObserver(observer, function()
        if first or not comparator(value, currentValue) then
          observer:onNext(value, util.unpack(values))
          currentValue = value
          first = false
        end
      end)
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns an Observable that produces the nth element produced by the source Observable.
--- @param index number - The index of the item, with an index of 1 representing the first.
--- @return Observable
function Observable:elementAt(index)
  if not index or type(index) ~= 'number' then
    error('Expected a number')
  end

  return Observable.create(function(observer)
    local subscription
    local i = 1

    local function onNext(...)
      if i == index then
        observer:onNext(...)
        observer:onCompleted()
        if subscription then
          subscription:unsubscribe()
        end
      else
        i = i + 1
      end
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    subscription = self:subscribe(onNext, onError, onCompleted)
    return subscription
  end)
end

--- Returns a new Observable that only produces values of the first that satisfy a predicate.
--- @param predicate function - The predicate used to filter values.
--- @return Observable
function Observable:filter(predicate)
  predicate = predicate or util.identity

  return Observable.create(function(observer)
    local function onNext(...)
      util.tryWithObserver(observer, function(...)
        if predicate(...) then
          return observer:onNext(...)
        end
      end, ...)
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that produces the first value of the original that satisfies a
-- predicate.
--- @param predicate function - The predicate used to find a value.
function Observable:find(predicate)
  predicate = predicate or util.identity

  return Observable.create(function(observer)
    local function onNext(...)
      util.tryWithObserver(observer, function(...)
        if predicate(...) then
          observer:onNext(...)
          return observer:onCompleted()
        end
      end, ...)
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that only produces the first result of the original.
--- @return Observable
function Observable:first()
  return self:take(1)
end

--- Returns a new Observable that transform the items emitted by an Observable into Observables,
-- then flatten the emissions from those into a single Observable
--- @param callback function - The function to transform values from the original Observable.
--- @return Observable
function Observable:flatMap(callback)
  callback = callback or util.identity
  return self:map(callback):flatten()
end

--- Returns a new Observable that uses a callback to create Observables from the values produced by
-- the source, then produces values from the most recent of these Observables.
--- @param callback function? - The function used to convert values to Observables. By default util.identity
--- @return Observable
function Observable:flatMapLatest(callback)
  callback = callback or util.identity
  return Observable.create(function(observer)
    local innerSubscription

    local function onNext(...)
      observer:onNext(...)
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    local function subscribeInner(...)
      if innerSubscription then
        innerSubscription:unsubscribe()
      end

      return util.tryWithObserver(observer, function(...)
        innerSubscription = callback(...):subscribe(onNext, onError)
      end, ...)
    end

    local subscription = self:subscribe(subscribeInner, onError, onCompleted)
    return Subscription.create(function()
      if innerSubscription then
        innerSubscription:unsubscribe()
      end

      if subscription then
        subscription:unsubscribe()
      end
    end)
  end)
end

--- Returns a new Observable that subscribes to the Observables produced by the original and
-- produces their values.
--- @return Observable
function Observable:flatten()
  return Observable.create(function(observer)
    local subscriptions = {}
    local remaining = 1

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      remaining = remaining - 1
      if remaining == 0 then
        return observer:onCompleted()
      end
    end

    local function onNext(observable)
      local function innerOnNext(...)
        observer:onNext(...)
      end

      remaining = remaining + 1
      local subscription = observable:subscribe(innerOnNext, onError, onCompleted)
      subscriptions[#subscriptions + 1] = subscription
    end

    subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
    return Subscription.create(function ()
      for i = 1, #subscriptions do
        subscriptions[i]:unsubscribe()
      end
    end)
  end)
end

--- Returns an Observable that terminates when the source terminates but does not produce any
-- elements.
--- @return Observable
function Observable:ignoreElements()
  return Observable.create(function(observer)
    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(nil, onError, onCompleted)
  end)
end

--- Returns a new Observable that only produces the last result of the original.
--- @return Observable
function Observable:last()
  return Observable.create(function(observer)
    local value
    local empty = true

    local function onNext(...)
      value = {...}
      empty = false
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      if not empty then
        observer:onNext(util.unpack(value or {}))
      end

      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that produces the values of the original transformed by a function.
--- @param callback function - The function to transform values from the original Observable.
--- @return Observable
function Observable:map(callback)
  return Observable.create(function(observer)
    callback = callback or util.identity

    local function onNext(...)
      return util.tryWithObserver(observer, function(...)
        return observer:onNext(callback(...))
      end, ...)
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that produces the maximum value produced by the original.
--- @return Observable
function Observable:max()
  return self:reduce(math.max)
end

--- Returns a new Observable that produces the values produced by all the specified Observables in
-- the order they are produced.
--- @param ... Observable - One or more Observables to merge.
--- @return Observable
function Observable:merge(...)
  local sources = {...}
  table.insert(sources, 1, self)

  return Observable.create(function(observer)
    local completed = {}
    local subscriptions = {}

    local function onNext(...)
      return observer:onNext(...)
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted(i)
      return function()
        table.insert(completed, i)

        if #completed == #sources then
          observer:onCompleted()
        end
      end
    end

    for i = 1, #sources do
      subscriptions[i] = sources[i]:subscribe(onNext, onError, onCompleted(i))
    end

    return Subscription.create(function ()
      for i = 1, #sources do
        if subscriptions[i] then subscriptions[i]:unsubscribe() end
      end
    end)
  end)
end

--- Returns a new Observable that produces the minimum value produced by the original.
--- @return Observable
function Observable:min()
  return self:reduce(math.min)
end

--- Returns an Observable that produces the values of the original inside tables.
--- @return Observable
function Observable:pack()
  return self:map(util.pack)
end

--- Returns two Observables: one that produces values for which the predicate returns truthy for,
-- and another that produces values for which the predicate returns falsy.
--- @param predicate function - The predicate used to partition the values.
--- @return Observable, Observable
function Observable:partition(predicate)
  return self:filter(predicate), self:reject(predicate)
end

--- Returns a new Observable that produces values computed by extracting the given keys from the
-- tables produced by the original.
--- @param ... string - The key to extract from the table. Multiple keys can be specified to recursively pluck values from nested tables.
--- @return Observable
function Observable:pluck(key, ...)
  if not key then return self end

  if type(key) ~= 'string' and type(key) ~= 'number' then
    return Observable.throw('pluck key must be a string')
  end

  return Observable.create(function(observer)
    local function onNext(t)
      return observer:onNext(t[key])
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end):pluck(...)
end

--- Returns a new Observable that produces a single value computed by accumulating the results of
-- running a function on each value produced by the original Observable.
--- @param accumulator function - Accumulates the values of the original Observable. Will be passed the return value of the last call as the first argument and the current values as the rest of the arguments.
--- @param seed any - A value to pass to the accumulator the first time it is run.
--- @return Observable
function Observable:reduce(accumulator, seed)
  return Observable.create(function(observer)
    local result = seed
    local first = true

    local function onNext(...)
      if first and seed == nil then
        result = ...
        first = false
      else
        return util.tryWithObserver(observer, function(...)
          result = accumulator(result, ...)
        end, ...)
      end
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      observer:onNext(result)
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that produces values from the original which do not satisfy a
-- predicate.
--- @param predicate function - The predicate used to reject values.
--- @return Observable
function Observable:reject(predicate)
  predicate = predicate or util.identity

  return Observable.create(function(observer)
    local function onNext(...)
      util.tryWithObserver(observer, function(...)
        if not predicate(...) then
          return observer:onNext(...)
        end
      end, ...)
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns an Observable that restarts in the event of an error.
--- @param count number - The maximum number of times to retry.  If left unspecified, an infinite number of retries will be attempted.
--- @return Observable
function Observable:retry(count)
  return Observable.create(function(observer)
    local subscription
    local retries = 0

    local function onNext(...)
      return observer:onNext(...)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    local function onError(message)
      if subscription then
        subscription:unsubscribe()
      end

      retries = retries + 1
      if count and retries > count then
        return observer:onError(message)
      end

      subscription = self:subscribe(onNext, onError, onCompleted)
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that produces its most recent value every time the specified observable
-- produces a value.
--- @param sampler Observable - The Observable that is used to sample values from this Observable.
--- @return Observable
function Observable:sample(sampler)
  if not sampler then error('Expected an Observable') end

  return Observable.create(function(observer)
    local latest = {}

    local function setLatest(...)
      latest = util.pack(...)
    end

    local function onNext()
      if #latest > 0 then
        return observer:onNext(util.unpack(latest))
      end
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    local sourceSubscription = self:subscribe(setLatest, onError)
    local sampleSubscription = sampler:subscribe(onNext, onError, onCompleted)

    return Subscription.create(function()
      if sourceSubscription then sourceSubscription:unsubscribe() end
      if sampleSubscription then sampleSubscription:unsubscribe() end
    end)
  end)
end

--- Returns a new Observable that produces values computed by accumulating the results of running a
-- function on each value produced by the original Observable.
--- @param accumulator function - Accumulates the values of the original Observable. Will be passed the return value of the last call as the first argument and the current values as the rest of the arguments.  Each value returned from this function will be emitted by the Observable.
--- @param seed any - A value to pass to the accumulator the first time it is run.
--- @return Observable
function Observable:scan(accumulator, seed)
  return Observable.create(function(observer)
    local result = seed
    local first = true

    local function onNext(...)
      if first and seed == nil then
        result = ...
        first = false
      else
        return util.tryWithObserver(observer, function(...)
          result = accumulator(result, ...)
          observer:onNext(result)
        end, ...)
      end
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that skips over a specified number of values produced by the original
-- and produces the rest.
--- @param n number - The number of values to ignore.
--- @return Observable
function Observable:skip(n)
  n = n or 1

  return Observable.create(function(observer)
    local i = 1

    local function onNext(...)
      if i > n then
        observer:onNext(...)
      else
        i = i + 1
      end
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns an Observable that omits a specified number of values from the end of the original
-- Observable.
--- @param count number - The number of items to omit from the end.
--- @return Observable
function Observable:skipLast(count)
  if not count or type(count) ~= 'number' then
    error('Expected a number')
  end

  local buffer = {}
  return Observable.create(function(observer)
    local function emit()
      if #buffer > count and buffer[1] then
        local values = table.remove(buffer, 1)
        observer:onNext(util.unpack(values))
      end
    end

    local function onNext(...)
      emit()
      table.insert(buffer, util.pack(...))
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      emit()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that skips over values produced by the original until the specified
-- Observable produces a value.
--- @param other Observable - The Observable that triggers the production of values.
--- @return Observable
function Observable:skipUntil(other)
  return Observable.create(function(observer)
    local triggered = false
    local function trigger()
      triggered = true
    end

    other:subscribe(trigger, trigger, trigger)

    local function onNext(...)
      if triggered then
        observer:onNext(...)
      end
    end

    local function onError()
      if triggered then
        observer:onError()
      end
    end

    local function onCompleted()
      if triggered then
        observer:onCompleted()
      end
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that skips elements until the predicate returns falsy for one of them.
--- @param predicate function - The predicate used to continue skipping values.
--- @return Observable
function Observable:skipWhile(predicate)
  predicate = predicate or util.identity

  return Observable.create(function(observer)
    local skipping = true

    local function onNext(...)
      if skipping then
        util.tryWithObserver(observer, function(...)
          skipping = predicate(...)
        end, ...)
      end

      if not skipping then
        return observer:onNext(...)
      end
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that produces the specified values followed by all elements produced by
-- the source Observable.
--- @param ... any - The values to produce before the Observable begins producing values normally.
--- @return Observable
function Observable:startWith(...)
  local values = util.pack(...)
  return Observable.create(function(observer)
    observer:onNext(util.unpack(values))
    return self:subscribe(observer)
  end)
end

--- Returns an Observable that produces a single value representing the sum of the values produced
-- by the original.
--- @return Observable
function Observable:sum()
  return self:reduce(function(x, y) return x + y end, 0)
end

--- Given an Observable that produces Observables, returns an Observable that produces the values
-- produced by the most recently produced Observable.
--- @return Observable
function Observable:switch()
  return Observable.create(function(observer)
    local innerSubscription

    local function onNext(...)
      return observer:onNext(...)
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    local function switch(source)
      if innerSubscription then
        innerSubscription:unsubscribe()
      end

      innerSubscription = source:subscribe(onNext, onError, nil)
    end

    local subscription = self:subscribe(switch, onError, onCompleted)
    return Subscription.create(function()
      if innerSubscription then
        innerSubscription:unsubscribe()
      end

      if subscription then
        subscription:unsubscribe()
      end
    end)
  end)
end

--- Returns a new Observable that only produces the first n results of the original.
--- @param n number - The number of elements to produce before completing.
--- @return Observable
function Observable:take(n)
  n = n or 1

  return Observable.create(function(observer)
    if n <= 0 then
      observer:onCompleted()
      return
    end

    local i = 1

    local function onNext(...)
      observer:onNext(...)

      i = i + 1

      if i > n then
        observer:onCompleted()
      end
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns an Observable that produces a specified number of elements from the end of a source
-- Observable.
--- @param count number - The number of elements to produce.
--- @return Observable
function Observable:takeLast(count)
  if not count or type(count) ~= 'number' then
    error('Expected a number')
  end

  return Observable.create(function(observer)
    local buffer = {}

    local function onNext(...)
      table.insert(buffer, util.pack(...))
      if #buffer > count then
        table.remove(buffer, 1)
      end
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      for i = 1, #buffer do
        observer:onNext(util.unpack(buffer[i]))
      end
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that completes when the specified Observable fires.
--- @param other Observable - The Observable that triggers completion of the original.
--- @return Observable
function Observable:takeUntil(other)
  return Observable.create(function(observer)
    local function onNext(...)
      return observer:onNext(...)
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    other:subscribe(onCompleted, onCompleted, onCompleted)

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns a new Observable that produces elements until the predicate returns falsy.
--- @param predicate function - The predicate used to continue production of values.
--- @return Observable
function Observable:takeWhile(predicate)
  predicate = predicate or util.identity

  return Observable.create(function(observer)
    local taking = true

    local function onNext(...)
      if taking then
        util.tryWithObserver(observer, function(...)
          taking = predicate(...)
        end, ...)

        if taking then
          return observer:onNext(...)
        else
          return observer:onCompleted()
        end
      end
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Runs a function each time this Observable has activity. Similar to subscribe but does not
-- create a subscription.
--- @param _onNext fun(...)? - Run when the Observable produces values.
--- @param _onError fun(message:string)? - Run when the Observable encounters a problem.
--- @param _onCompleted function? - Run when the Observable completes.
--- @return Observable
function Observable:tap(_onNext, _onError, _onCompleted)
  _onNext = _onNext or util.noop
  _onError = _onError or util.noop
  _onCompleted = _onCompleted or util.noop

  return Observable.create(function(observer)
    local function onNext(...)
      util.tryWithObserver(observer, function(...)
        _onNext(...)
      end, ...)

      return observer:onNext(...)
    end

    local function onError(message)
      util.tryWithObserver(observer, function()
        _onError(message)
      end)

      return observer:onError(message)
    end

    local function onCompleted()
      util.tryWithObserver(observer, function()
        _onCompleted()
      end)

      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns an Observable that unpacks the tables produced by the original.
--- @return Observable
function Observable:unpack()
  return self:map(util.unpack)
end

--- Returns an Observable that takes any values produced by the original that consist of multiple
-- return values and produces each value individually.
--- @return Observable
function Observable:unwrap()
  return Observable.create(function(observer)
    local function onNext(...)
      local values = {...}
      for i = 1, #values do
        observer:onNext(values[i])
      end
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns an Observable that produces a sliding window of the values produced by the original.
--- @param size number - The size of the window. The returned observable will produce this number of the most recent values as multiple arguments to onNext.
--- @return Observable
function Observable:window(size)
  if not size or type(size) ~= 'number' then
    error('Expected a number')
  end

  return Observable.create(function(observer)
    local window = {}

    local function onNext(value)
      table.insert(window, value)

      if #window >= size then
        observer:onNext(util.unpack(window))
        table.remove(window, 1)
      end
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    return self:subscribe(onNext, onError, onCompleted)
  end)
end

--- Returns an Observable that produces values from the original along with the most recently
-- produced value from all other specified Observables. Note that only the first argument from each
-- source Observable is used.
--- @param ... Observable - The Observables to include the most recent values from.
--- @return Observable
function Observable:with(...)
  local sources = {...}

  return Observable.create(function(observer)
    local latest = setmetatable({}, {__len = util.constant(#sources)})
    local subscriptions = {}

    local function setLatest(i)
      return function(value)
        latest[i] = value
      end
    end

    local function onNext(value)
      return observer:onNext(value, util.unpack(latest))
    end

    local function onError(e)
      return observer:onError(e)
    end

    local function onCompleted()
      return observer:onCompleted()
    end

    for i = 1, #sources do
      subscriptions[i] = sources[i]:subscribe(setLatest(i), util.noop, util.noop)
    end

    subscriptions[#sources + 1] = self:subscribe(onNext, onError, onCompleted)
    return Subscription.create(function ()
      for i = 1, #sources + 1 do
        if subscriptions[i] then subscriptions[i]:unsubscribe() end
      end
    end)
  end)
end

--- Returns an Observable that merges the values produced by the source Observables by grouping them
-- by their index.  The first onNext event contains the first value of all of the sources, the
-- second onNext event contains the second value of all of the sources, and so on.  onNext is called
-- a number of times equal to the number of values produced by the Observable that produces the
-- fewest number of values.
--- @param ... Observable - The Observables to zip.
--- @return Observable
function Observable.zip(...)
  local sources = util.pack(...)
  local count = #sources

  return Observable.create(function(observer)
    local values = {}
    local active = {}
    local subscriptions = {}
    for i = 1, count do
      values[i] = {n = 0}
      active[i] = true
    end

    local function onNext(i)
      return function(value)
        table.insert(values[i], value)
        values[i].n = values[i].n + 1

        local ready = true
        for i = 1, count do
          if values[i].n == 0 then
            ready = false
            break
          end
        end

        if ready then
          local payload = {}

          for i = 1, count do
            payload[i] = table.remove(values[i], 1)
            values[i].n = values[i].n - 1
          end

          observer:onNext(util.unpack(payload))
        end
      end
    end

    local function onError(message)
      return observer:onError(message)
    end

    local function onCompleted(i)
      return function()
        active[i] = nil
        if not next(active) or values[i].n == 0 then
          return observer:onCompleted()
        end
      end
    end

    for i = 1, count do
      subscriptions[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i))
    end

    return Subscription.create(function()
      for i = 1, count do
        if subscriptions[i] then subscriptions[i]:unsubscribe() end
      end
    end)
  end)
end

--- @class ImmediateScheduler
-- @description Schedules Observables by running all operations immediately.
ImmediateScheduler = {}
ImmediateScheduler.__index = ImmediateScheduler
ImmediateScheduler.__tostring = util.constant('ImmediateScheduler')

--- Creates a new ImmediateScheduler.
--- @return ImmediateScheduler}
function ImmediateScheduler.create()
  return setmetatable({}, ImmediateScheduler)
end

--- Schedules a function to be run on the scheduler. It is executed immediately.
--- @param action function - The function to execute.
function ImmediateScheduler:schedule(action)
  action()
end

--- @class CooperativeScheduler
--- @field tasks {thread: thread, due: number}[]
-- @description Manages Observables using coroutines and a virtual clock that must be updated
-- manually.
CooperativeScheduler = {}
CooperativeScheduler.__index = CooperativeScheduler
CooperativeScheduler.__tostring = util.constant('CooperativeScheduler')

--- Creates a new CooperativeScheduler.
--- @param currentTime number - A time to start the scheduler at.
--- @return CooperativeScheduler
function CooperativeScheduler.create(currentTime)
  local self = {
    tasks = {},
    currentTime = currentTime or 0
  }

  return setmetatable(self, CooperativeScheduler)
end

--- Schedules a function to be run after an optional delay.  Returns a subscription that will stop
-- the action from running.
--- @param action function - The function to execute. Will be converted into a coroutine. The coroutine may yield execution back to the scheduler with an optional number, which will put it to sleep for a time period.
--- @param delay number - Delay execution of the action by a virtual time period.
--- @return Subscription
function CooperativeScheduler:schedule(action, delay)
  local task = {
    thread = coroutine.create(action),
    due = self.currentTime + (delay or 0)
  }

  table.insert(self.tasks, task)

  return Subscription.create(function()
    return self:unschedule(task)
  end)
end

function CooperativeScheduler:unschedule(task)
  for i = 1, #self.tasks do
    if self.tasks[i] == task then
      table.remove(self.tasks, i)
    end
  end
end

--- Triggers an update of the CooperativeScheduler. The clock will be advanced and the scheduler
-- will run any coroutines that are due to be run.
--- @param delta number - An amount of time to advance the clock by. It is common to pass in the time in seconds or milliseconds elapsed since this function was last called.
function CooperativeScheduler:update(delta)
  self.currentTime = self.currentTime + (delta or 0)

  local i = 1
  while i <= #self.tasks do
    local task = self.tasks[i]

    if self.currentTime >= task.due then
      local success, delay = coroutine.resume(task.thread)

      if coroutine.status(task.thread) == 'dead' then
        table.remove(self.tasks, i)
      else
        task.due = math.max(task.due + (delay or 0), self.currentTime)
        i = i + 1
      end

      if not success then
        error(delay)
      end
    else
      i = i + 1
    end
  end
end

--- Returns whether or not the CooperativeScheduler's queue is empty.
function CooperativeScheduler:isEmpty()
  return not next(self.tasks)
end

--- @class TimeoutScheduler
-- @description A scheduler that uses luvit's timer library to schedule events on an event loop.
TimeoutScheduler = {}
TimeoutScheduler.__index = TimeoutScheduler
TimeoutScheduler.__tostring = util.constant('TimeoutScheduler')

--- Creates a new TimeoutScheduler.
--- @return TimeoutScheduler
function TimeoutScheduler.create()
  return setmetatable({}, TimeoutScheduler)
end

--- Schedules an action to run at a future point in time.
--- @param action function - The action to run.
--- @param delay number - The delay, in milliseconds.
--- @return Subscription
function TimeoutScheduler:schedule(action, delay, ...)
  local timer = require 'timer'
  local subscription
  local handle = timer.setTimeout(delay, action, ...)
  return Subscription.create(function()
    timer.clearTimeout(handle)
  end)
end

--- @class Subject : Observer, Observable
--- @field observers Observer[]
-- Subjects function both as an Observer and as an Observable. Subjects inherit all
-- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
-- be broadcasted to any subscribed Observers.
Subject = setmetatable({}, Observable)
Subject.__index = Subject
Subject.__tostring = util.constant('Subject')

--- Creates a new Subject.
--- @return Subject
function Subject.create()
  return setmetatable({
    observers = {},
    stopped = false
  }, Subject) --[[@as Subject]]
end

--- Creates a new Observer and attaches it to the Subject.
--- @param onNext fun(...)|Observer - A function called when the Subject produces a value or an existing Observer to attach to the Subject.
--- @param onError function? - Called when the Subject terminates due to an error.
--- @param onCompleted function? - Called when the Subject completes normally.
function Subject:subscribe(onNext, onError, onCompleted)
  local observer

  if util.isa(onNext, Observer) then
    observer = onNext
  else
    observer = Observer.create(onNext --[[@as fun(...)]], onError, onCompleted)
  end

  table.insert(self.observers, observer)

  return Subscription.create(function()
    for i = 1, #self.observers do
      if self.observers[i] == observer then
        table.remove(self.observers, i)
        return
      end
    end
  end)
end

--- Pushes zero or more values to the Subject. They will be broadcasted to all Observers.
--- @param ... any
function Subject:onNext(...)
  if not self.stopped then
    for i = #self.observers, 1, -1 do
      self.observers[i]:onNext(...)
    end
  end
end

--- Signal to all Observers that an error has occurred.
--- @param message string - A string describing what went wrong.
function Subject:onError(message)
  if not self.stopped then
    for i = #self.observers, 1, -1 do
      self.observers[i]:onError(message)
    end

    self.stopped = true
  end
end

--- Signal to all Observers that the Subject will not produce any more values.
function Subject:onCompleted()
  if not self.stopped then
    for i = #self.observers, 1, -1 do
      self.observers[i]:onCompleted()
    end

    self.stopped = true
  end
end

Subject.__call = Subject.onNext

--- @class AsyncSubject: Subject
-- @description AsyncSubjects are subjects that produce either no values or a single value.  If
-- multiple values are produced via onNext, only the last one is used.  If onError is called, then
-- no value is produced and onError is called on any subscribed Observers.  If an Observer
-- subscribes and the AsyncSubject has already terminated, the Observer will immediately receive the
-- value or the error.
AsyncSubject = setmetatable({}, Observable)
AsyncSubject.__index = AsyncSubject
AsyncSubject.__tostring = util.constant('AsyncSubject')

--- Creates a new AsyncSubject.
--- @return AsyncSubject
function AsyncSubject.create()
  local self = {
    observers = {},
    stopped = false,
    value = nil,
    errorMessage = nil
  }

  return setmetatable(self, AsyncSubject) --[[@as AsyncSubject]]
end

--- Creates a new Observer and attaches it to the AsyncSubject.
--- @param onNext fun(...) - A function called when the AsyncSubject produces a value or an existing Observer to attach to the AsyncSubject.
--- @param onError function - Called when the AsyncSubject terminates due to an error.
--- @param onCompleted function - Called when the AsyncSubject completes normally.
function AsyncSubject:subscribe(onNext, onError, onCompleted)
  local observer

  if util.isa(onNext, Observer) then
    observer = onNext
  else
    observer = Observer.create(onNext, onError, onCompleted)
  end

  if self.value then
    observer:onNext(util.unpack(self.value))
    observer:onCompleted()
    return
  elseif self.errorMessage then
    observer:onError(self.errorMessage)
    return
  end

  table.insert(self.observers, observer)

  return Subscription.create(function()
    for i = 1, #self.observers do
      if self.observers[i] == observer then
        table.remove(self.observers, i)
        return
      end
    end
  end)
end

--- Pushes zero or more values to the AsyncSubject.
--- @param ... any
function AsyncSubject:onNext(...)
  if not self.stopped then
    self.value = util.pack(...)
  end
end

--- Signal to all Observers that an error has occurred.
--- @param message string - A string describing what went wrong.
function AsyncSubject:onError(message)
  if not self.stopped then
    self.errorMessage = message

    for i = 1, #self.observers do
      self.observers[i]:onError(self.errorMessage)
    end

    self.stopped = true
  end
end

--- Signal to all Observers that the AsyncSubject will not produce any more values.
function AsyncSubject:onCompleted()
  if not self.stopped then
    for i = 1, #self.observers do
      if self.value then
        self.observers[i]:onNext(util.unpack(self.value))
      end

      self.observers[i]:onCompleted()
    end

    self.stopped = true
  end
end

AsyncSubject.__call = AsyncSubject.onNext

--- @class BehaviorSubject : Subject
-- @description A Subject that tracks its current value. Provides an accessor to retrieve the most
-- recent pushed value, and all subscribers immediately receive the latest value.
BehaviorSubject = setmetatable({}, Subject)
BehaviorSubject.__index = BehaviorSubject
BehaviorSubject.__tostring = util.constant('BehaviorSubject')

--- Creates a new BehaviorSubject.
--- @param ... any - The initial values.
--- @return BehaviorSubject
function BehaviorSubject.create(...)
  local self = {
    observers = {},
    stopped = false
  }

  if select('#', ...) > 0 then
    self.value = util.pack(...)
  end

  return setmetatable(self, BehaviorSubject) --[[@as BehaviorSubject]]
end

--- Creates a new Observer and attaches it to the BehaviorSubject. Immediately broadcasts the most
-- recent value to the Observer.
--- @param onNext function - Called when the BehaviorSubject produces a value.
--- @param onError function - Called when the BehaviorSubject terminates due to an error.
--- @param onCompleted function - Called when the BehaviorSubject completes normally.
function BehaviorSubject:subscribe(onNext, onError, onCompleted)
  local observer

  if util.isa(onNext, Observer) then
    observer = onNext
  else
    observer = Observer.create(onNext, onError, onCompleted)
  end

  local subscription = Subject.subscribe(self, observer)

  if self.value then
    observer:onNext(util.unpack(self.value))
  end

  return subscription
end

--- Pushes zero or more values to the BehaviorSubject. They will be broadcasted to all Observers.
--- @param ... any
function BehaviorSubject:onNext(...)
  self.value = util.pack(...)
  return Subject.onNext(self, ...)
end

--- Returns the last value emitted by the BehaviorSubject, or the initial value passed to the
-- constructor if nothing has been emitted yet.
--- @return any
function BehaviorSubject:getValue()
  if self.value ~= nil then
    return util.unpack(self.value)
  end
end

BehaviorSubject.__call = BehaviorSubject.onNext

--- @class ReplaySubject : Subject
--- @field buffer any[]
--- @field bufferSize integer
-- @description A Subject that provides new Subscribers with some or all of the most recently
-- produced values upon subscription.
ReplaySubject = setmetatable({}, Subject)
ReplaySubject.__index = ReplaySubject
ReplaySubject.__tostring = util.constant('ReplaySubject')

--- Creates a new ReplaySubject.
--- @param bufferSize number - The number of values to send to new subscribers. If nil, an infinite buffer is used (note that this could lead to memory issues).
--- @return ReplaySubject
function ReplaySubject.create(bufferSize)
  local self = {
    observers = {},
    stopped = false,
    buffer = {},
    bufferSize = bufferSize
  }

  return setmetatable(self, ReplaySubject) --[[@as ReplaySubject]]
end

--- Creates a new Observer and attaches it to the ReplaySubject. Immediately broadcasts the most
-- contents of the buffer to the Observer.
--- @param onNext function - Called when the ReplaySubject produces a value.
--- @param onError function - Called when the ReplaySubject terminates due to an error.
--- @param onCompleted function - Called when the ReplaySubject completes normally.
function ReplaySubject:subscribe(onNext, onError, onCompleted)
  local observer

  if util.isa(onNext, Observer) then
    observer = onNext
  else
    observer = Observer.create(onNext, onError, onCompleted)
  end

  local subscription = Subject.subscribe(self, observer)

  for i = 1, #self.buffer do
    observer:onNext(util.unpack(self.buffer[i]))
  end

  return subscription
end

--- Pushes zero or more values to the ReplaySubject. They will be broadcasted to all Observers.
--- @param ... any
function ReplaySubject:onNext(...)
  table.insert(self.buffer, util.pack(...))
  if self.bufferSize and #self.buffer > self.bufferSize then
    table.remove(self.buffer, 1)
  end

  return Subject.onNext(self, ...)
end

ReplaySubject.__call = ReplaySubject.onNext

Observable.wrap = Observable.buffer
Observable['repeat'] = Observable.replicate
end
Lua:
-- Scheduler for Rx.Lua using TimerQueue & Stopwatch library

---@class Scheduler
---@field tasks Observer
local Scheduler = {}

local timerQueue

function Scheduler.create()
    timerQueue = TimerQueue.create()
    return Scheduler --apparently I need an instance.
end

---@param action function
---@param delay number
function Scheduler:schedule(action, delay)
    return timerQueue:callDelayed(delay, action)
end
TimerQueue & Stopwatch
Definitive Doubly-Linked List

Lua:
if Debug then Debug.beginFile "TaskObservable" end
do
    ---@class TaskObservable : Observable
    ---@field _subscribe fun(observer: TaskObserver): any?
    -- Observables push values to Observers.
    TaskObservable = {}
    TaskObservable.__index = TaskObservable
   
    --- Creates a new TaskObservable.
    ---@param subscribe fun(observer: TaskObserver): any? The subscription function that produces values.
    ---@return TaskObservable
    function TaskObservable.create(subscribe)
      return setmetatable(Observable.create(subscribe), TaskObservable) --[[@as TaskObservable]]
    end
   
    --- Shorthand for creating an TaskObserver and passing it to this TaskObservable's subscription function.
    ---@generic T
    ---@param onNext table|fun(value: T, delay: number) Called when the TaskObservable produces a value.
    ---@param onError fun(message: string, delay: number)? Called when the TaskObservable terminates due to an error.
    ---@param onCompleted fun(delay: number)? Called when the TaskObservable completes normally.
    function TaskObservable:subscribe(onNext, onError, onCompleted)
      if type(onNext) == 'table' then
        return self._subscribe(onNext)
      else
        return self._subscribe(TaskObserver.create(onNext, onError, onCompleted))
      end
    end
end
if Debug then Debug.endFile() end
Lua:
if Debug then Debug.beginFile "TaskObserver" end
do
    ---@class TaskObserver : Observer
    ---@field _onNext fun(value: unknown, delay: number)?
    ---@field _onError fun(message: string, delay: number)?
    ---@field _onCompleted fun(delay: number)?
    TaskObserver = {}
    TaskObserver.__index = TaskObserver
    setmetatable(TaskObserver, Observable)

    ---@generic T
    ---@param onNext fun(value: T, delay: number)
    ---@param onError fun(message: string, delay: number)?
    ---@param onCompleted fun()?
    ---@return TaskObserver
    function TaskObserver.create(onNext, onError, onCompleted)
        return setmetatable(Observer.create(onNext, onError, onCompleted), TaskObserver) --[[@as TaskObserver]]
    end

    --- Pushes a value to the Observer.
    ---@generic T
    ---@param value T
    ---@param delay number
    function TaskObserver:onNext(value, delay)
        if not self.stopped then
            self._onNext(value, delay)
        end
    end

    --- Notify the Observer that an error has occurred.
    ---@param message string A string describing what went wrong.
    ---@param delay number
    function TaskObserver:onError(message, delay)
        if not self.stopped then
            self.stopped = true
            self._onError(message, delay)
        end
    end

    --- Notify the Observer that the sequence has completed and will produce no more values.
    ---@param delay number
    function TaskObserver:onCompleted(delay)
        if not self.stopped then
            self.stopped = true
            self._onCompleted(delay)
        end
    end

end
if Debug then Debug.endFile() end
Lua:
if Debug then Debug.beginFile "TaskSubject" end
do
    ---@class TaskSubject : AsyncSubject
    ---@field observers TaskObserver[]
    TaskSubject = {}
    TaskSubject.__index = TaskSubject
    setmetatable(TaskSubject, AsyncSubject)

    ---@generic T
    ---@param task Task<T>
    ---@return TaskSubject
    function TaskSubject.create(task)
        local TaskSubject = setmetatable({
            taskRef = task,
            observers = {}
        }, TaskSubject)
        return TaskSubject --[[@as TaskSubject]]
    end

    --- Creates a new Observer and attaches it to the TaskSubject.
    ---@generic T
    ---@param onNext fun(value: T, delay: number) - A function called when the TaskSubject produces a value or an existing Observer to attach to the TaskSubject.
    ---@param onError fun(message: string, delay: number) - Called when the TaskSubject terminates due to an error.
    ---@param onCompleted fun(delay: number) - Called when the TaskSubject completes normally.
    ---@return Subscription?
    function TaskSubject:subscribe(onNext, onError, onCompleted)
        local observer = TaskObserver.create(onNext, onError, onCompleted) --[[@as TaskObserver]]
        if self.value then
            observer:onNext(self.value, self.delay)
            observer:onCompleted(self.delay)
            return
        elseif self.errorMessage then
            observer:onError(self.errorMessage, self.delay)
            return
        end

        table.insert(self.observers, observer)

        return Subscription.create(function()
            for i = 1, #self.observers do
                if self.observers[i] == observer then
                    table.remove(self.observers, i)
                    return
                end
            end
        end)
    end

    --- Pushes a value to the TaskSubject.
    ---@generic T
    ---@param value T
    ---@param delay number
    function TaskSubject:onNext(value, delay)
      if not self.stopped then
        self.value = value
        self.delay = delay
      end
    end
   
    --- Signal to all Observers that an error has occurred.
    ---@param message string - A string describing what went wrong.
    ---@param delay number
    function TaskSubject:onError(message, delay)
      if not self.stopped then
        self.errorMessage = message
   
        for i = 1, #self.observers do
          self.observers[i]:onError(self.errorMessage, delay)
        end
   
        self.stopped = true
      end
    end
   
    --- Signal to all Observers that the TaskSubject will not produce any more values.
    ---@param delay number
    function TaskSubject:onCompleted(delay)
      if not self.stopped then
        for i = 1, #self.observers do
          if self.value then
            self.observers[i]:onNext(self.value, self.delay)
          end
   
          self.observers[i]:onCompleted(delay)
        end
   
        self.stopped = true
      end
    end

end
if Debug then Debug.endFile() end
Lua:
if Debug then Debug.beginFile "TaskProcessor" end

-- Requires: TimerQueue & Stopwatch, Definitive Doubly-Linked List and RxLua

do
    local WC3_OP_LIMIT = 1666666 -- JASS's max OP limit, used as reference to how many operations would lua be able to do without in-game lags. Very experimental and not yet confirmed.
    -- How many operations can all processors in total use up in comparison to WC3_OP_LIMIT constant.
    -- Leftover is used up by the game's internal machinations and other triggers not registered with any processor.
    local GLOBAL_PROCESS_RATIO = 0.8 -- from 0.0 to 1.0
    local PROCESSOR_PERIOD = 0.02
    local DELAY_MULTIPLIER = 1/PROCESSOR_PERIOD

    -- ========================================= --
    -- Internal variables --
    -- ========================================= --
    local globalProcessOpLimit = WC3_OP_LIMIT*GLOBAL_PROCESS_RATIO*PROCESSOR_PERIOD
    local processors = {} --- @type Processor[]

    local function refreshProcessorsOpLimits()
        if #processors == 0 then return end
        local totalRatio = 0
        for _, processor in ipairs(processors) do
            totalRatio = totalRatio + processor.ratio;
        end
        for _, processor in ipairs(processors) do
            processor.opLimit = globalProcessOpLimit*(processor.ratio/totalRatio)
        end
    end

    ---@class Task
    ---@field fn fun(delay: number)
    ---@field opCount integer
    ---@field promise TaskSubject
    ---@field requestTime number
    Task = {}
    Task.__index = Task

    ---@param fn fun(delay: number)
    ---@param opCount integer
    ---@param currentTime number
    ---@return Task
    local function createTask(fn, opCount, currentTime)
        return setmetatable({
            fn = fn,
            opCount = opCount,
            requestTime = currentTime
        }, Task)
    end

    ---@class PeriodicTask : Task
    ---@field period number
    ---@field fn fun(delay: number): boolean done
    PeriodicTask = setmetatable({}, Task)
    PeriodicTask.__index = PeriodicTask

    ---@param task Task
    ---@param period number
    ---@return PeriodicTask
    local function createPeriodicTask(task, period)
        task.period = period
        return setmetatable(task, PeriodicTask) --[[@as PeriodicTask]]
    end

    ---@class TaskBucket
    ---@field tasks LinkedListHead
    ---@field opCount integer

    ---@param processor Processor
    ---@param task Task
    local function enqueueToAvailableTaskBucket(processor, task)
        if processor.taskExecutor.paused then
            processor.taskExecutor:resume()
        end
        local chosenBucket ---@type TaskBucket
        if processor.taskBuckets then
            for _, bucket in ipairs(processor.taskBuckets) do
                if bucket.opCount + task.opCount <= processor.opLimit then
                    chosenBucket = bucket
                end
            end
        end

        if chosenBucket == nil then
            chosenBucket = LinkedList.create()
            chosenBucket:insert(task)
            table.insert(processor.taskBuckets, {tasks = chosenBucket, opCount = task.opCount} --[[@as TaskBucket]])
        else
            chosenBucket.tasks:insert(task)
            chosenBucket.opCount = chosenBucket.opCount + task.opCount
        end
    end

    ---@class Processor
    ---@field ratio integer -- readOnly
    ---@field opLimit integer -- readOnly, how many operations can this processor do in a game-tick
    ---@field taskBuckets TaskBucket[] -- readOnly, don't touch!
    ---@field taskExecutor TimerQueue -- readOnly
    ---@field clock Stopwatch -- readOnly
    ---@field currentBucketIndex integer -- readOnly
    Processor = {}
    Processor.__index = Processor

    ---@param bucket TaskBucket
    ---@param task LinkedListNode
    local function dequeueTask(bucket, task)
        bucket.opCount = bucket.opCount - task.value--[[@as Task]].opCount
        task:remove()
    end

    ---@param processor Processor
    ---@param bucket TaskBucket
    ---@param currentTime number
    local function processTasks(processor, bucket, currentTime)
        if bucket.tasks.n > 0 then
            for taskNode in bucket.tasks:loop() do
                local task = taskNode.value ---@type Task
                local delay = (currentTime - task.requestTime)*DELAY_MULTIPLIER
                local status, result = pcall(task.fn, delay)

                if status == true then
                    task.promise:onNext(result, delay)
                else
                    task.promise:onError(result, delay)
                    task.promise:onCompleted(delay)
                    return
                end

                if task--[[@as PeriodicTask]].period and result == false then
                    task.requestTime = currentTime
                else
                    dequeueTask(bucket, taskNode)
                    task.promise:onCompleted(delay)
                end
            end
        end
    end

    ---@param processor Processor
    local function process(processor)
        local bucketAmount = #processor.taskBuckets
        if bucketAmount == 0 then
            processor.taskExecutor:pause()
            return
        elseif processor.currentBucketIndex > bucketAmount then
            processor.currentBucketIndex = 1
        end

        local bucket = processor.taskBuckets[processor.currentBucketIndex]
        processTasks(processor, bucket, processor.clock:getElapsed())

        if bucket.opCount == 0 then
            table.remove(processor.taskBuckets, processor.currentBucketIndex)
            if bucketAmount - 1 == 0 then
                processor.taskExecutor:pause()
                processor.currentBucketIndex = 1
            end
        else
            processor.currentBucketIndex = processor.currentBucketIndex + 1
        end
    end

    ---@param fn fun(delay: number):boolean done
    ---@param fnOpCount integer
    ---@param period number?
    ---@return TaskObservable
    function Processor:enqueueTask(fn, fnOpCount, period)
        assert(type(fn) == "function", "Parameter 'fn' must be a function.")
        assert(type(fnOpCount) == "number" and math.floor(fnOpCount) == fnOpCount, "Parameter 'fnOpCount' must be an integer.")
        local task = createTask(fn, fnOpCount, self.clock:getElapsed())
        if period ~= nil then
            assert(type(period) == 'number' and period > 0, "Parameter 'period' must be a number and higher than 0.")
            task = createPeriodicTask(task, period)
        end
        task.promise = TaskSubject.create(task)
        enqueueToAvailableTaskBucket(self, task)

        return task.promise --[[@as TaskObservable]]
    end

    ---@param ratio integer integer higher or equal to 1, will re-adjust this new processor and other existing processors op limit in comparison to these numbers.
    ---@return Processor
    function Processor.create(ratio)
        assert((type(ratio) == "number" and math.floor(ratio) == ratio), "Paramater 'ratio' must be an integer")
        assert(ratio >= 1, "Parameter 'ratio' must be higher or equal to 1")
        local instance = setmetatable({
            ratio = ratio,
            -- opLimit is set by refreshProcessorsOpLimits
            taskBuckets = {},
            taskExecutor = TimerQueue.create(),
            currentBucketIndex = 1,
            clock = Stopwatch.create(true)
        }, Processor)
        table.insert(processors, instance)
        refreshProcessorsOpLimits()
        instance.taskExecutor:pause();
        instance.taskExecutor:callPeriodically(PROCESSOR_PERIOD, nil, process, instance)
        return instance
    end

end
if Debug then Debug.endFile() end

Using information from this thread and the fact that the game in JASS VM can run up to 1666666 executions per second. I've set an arbitrary limit on how many operations all processor instances can do in a single game tick.
local WC3_OP_LIMIT = 1666666 -- JASS's max OP limit, used as reference to how many operations would Lua be able to do without in-game lags. Very experimental and not yet confirmed. -- How many operations can all processors in total use up in comparison to WC3_OP_LIMIT constant. -- Leftover is used up by the game's internal machinations and other triggers not registered with any processor. local GLOBAL_PROCESS_RATIO = 0.8 -- from 0.0 to 1.0 local PROCESSOR_PERIOD = 0.02 ... local globalProcessOpLimit = WC3_OP_LIMIT*GLOBAL_PROCESS_RATIO*PROCESSOR_PERIOD

Note: delay is specified in game ticks (how many increments of 0.02 seconds have passed)
Lua:
myProcessor = Processor.create(1)
myProcessor:enqueueTask( -- enqueue a one-shot task to be executed as soon as possible.
    ---@param delay number
    ---@return any value to be passed to subscribed function
    function(delay)
     -- do stuff here
    end, FUNCTION_OP_COUNT):subscribe(
    ---@param value any
    ---@param delay number
    function(value, delay)
        -- if executed once
    end,
    ---@param value any
    ---@param delay number
    function(value, delay)
        -- if thrown an error
    end,
    ---@param delay number
    function(delay)
        -- if task completed
    end
)

myProcessor:enqueueTask( -- enqueue a periodic task
    ---@param delay number
    ---@return boolean periodic task exit condition, if true, task is done
    function(delay)
     -- do stuff here
    end, FUNCTION_OP_COUNT, PERIOD):subscribe(
    ---@param value any
    ---@param delay number
    function(value, delay)
        -- if executed once
    end,
    ---@param value any
    ---@param delay number
    function(value, delay)
        -- if thrown an error
    end,
    ---@param delay number
    function(delay)
        -- if task completed
    end
)

FUNCTION_OP_COUNT here is a placeholder and must be calculated by the developer and is used by the TaskProcessor to determine how many tasks it can fit in 1 execution bucket.
Each function is bound to have a different OP_COUNT, while you can set it as a constant to be used everywhere, it probably won't be optimal.

Here's an example of converting a simple code into a more complicated one.

Lua:
---@param rect rect
function damageUnitsInRect(rect)
    local group = CreateGroup()
    GroupEnumUnitsInRect(group, rect, nil)
    ForGroup(group, function ()
        local unit = GetEnumUnit()
        local amount = someReallyComplicatedDamageCalculationOrSomething(unit)
        UnitDamageTarget(unit, unit, amount, true, false, ATTACK_TYPE_CHAOS, DAMAGE_TYPE_DEATH, WEAPON_TYPE_WHOKNOWS)
    end)
    DestroyGroup(group)
end

---@param rect rect
function damageUnitsInRect(rect)
    local group = CreateGroup()
    GroupEnumUnitsInRect(group, rect, nil)
    ForGroup(group, function ()
        local unit = GetEnumUnit()
        myProcessor:enqueueTask(function()
            return someReallyComplicatedDamageCalculationOrSomething(unit)
        end, OP_COUNT):subscribe(
            function (damageAmount, delay)
                UnitDamageTarget(unit, unit, damageAmount, true, false, ATTACK_TYPE_CHAOS, DAMAGE_TYPE_DEATH, WEAPON_TYPE_WHOKNOWS)
            end)
    end)
    DestroyGroup(group)
end

This is still an unfinished resource as it's missing features like:
  • Task priority
  • Merging task buckets
  • Actually handling periodic tasks, the PERIOD parameter is pretty much useless for now, as it repeats the task every 0.02 seconds anyways.
  • Ability to actually use scheduling strategies instead of just FIFO approach
Also, since the code can get quite messy and having the user calculate function's operation count manually, I'm thinking of having some sort of custom compiler inject task enqueueing in user code instead of having them do it by hand.
Also need a list of op-counts for natives.

Attached to this post is a heavily modified version of Chopinski's Missile System that uses this library as a proof of concept, since there's no apparent difference in-game between the original system and this one.
 

Attachments

  • Missiles Lua - remake.w3x
    1.3 MB · Views: 7
Last edited:
Level 19
Joined
Jan 3, 2022
Messages
320
Impressive, but do you think this approach is justified? I think in Lua it'd be more useful to provide an easier to use yieldable-callback that runs in part-time until finished (with delta-Time from a timer to deal with passed time).
"having the user calculate function's operation count manually" that's exactly the type of functions where I might bother to make a coroutine of. I would know exactly where to place a coroutine inside the function (probably to split a loop). But I don't like handling coroutines in Lua manually, too unwieldy and messy.
 
Level 6
Joined
Jun 30, 2017
Messages
41
Initially, I was going for a style of asynchronous programming I was familiar with when thinking about the API (ReactiveX Observables or Promise-Like in JS/TS), and I didn't know about coroutines at the time.
I'm thinking about making a coroutine style version of this resource as well, but I think when I do make that custom compiler I will be using this Rx Observable style resource since I'm not sure how would I go about automating coroutine injections in the code.
 

Bribe

Code Moderator
Level 50
Joined
Sep 26, 2009
Messages
9,464
Initially, I was going for a style of asynchronous programming I was familiar with when thinking about the API (ReactiveX Observables or Promise-Like in JS/TS), and I didn't know about coroutines at the time.
I'm thinking about making a coroutine style version of this resource as well, but I think when I do make that custom compiler I will be using this Rx Observable style resource since I'm not sure how would I go about automating coroutine injections in the code.
RxJS is completely insane. It took me months to get my head around how it works, why it's used in angular instead of promises, and the learning curve is nuts.

I would expect the average WC3 modder to know what a "wait" is. Or a "wait for condition" or a "loop". That's why I'm banking on Lua-Infused GUI being the right approach. Build the engines in weird Lua code and let GUI just be intuitive and as powerful as possible. The more modders we have, the better the games will be. I'd be surprised if more than a handful of modders here know what RxJS or even Promises are.
 
Last edited:
Level 6
Joined
Jun 30, 2017
Messages
41
"having the user calculate function's operation count manually" that's exactly the type of functions where I might bother to make a coroutine of.

So, I've been wracking my brain a bit over this, I don't think I can just let go of that opCount metric even if I made the API rely on coroutines instead of ReactiveX. The issue is to determine when the TaskProcessor will stop executing scheduled tasks and let the game tick occur.

To my understanding, Lua will not let the next game tick occur until the running lua thread for that tick is finished (We've seen this happen by game simply freezing in case of an infinite loop, whereas in JASS it will crash the thread if it runs over the OP Limit). Therefore I can't use timers to figure out how much time has passed between the start of task processor cycle and, well, current time.

Next idea is using os.time() to figure out how much time has passed, but I'm not sure if that will work. The concern with this idea is, what would happen if someone's CPU is way worse off than other players', it would take longer to execute the same actions. If TaskProcessor relies on os.time delta to determine if task in the bucket is gonna be processed in this cycle, or in the next, wouldn't that make a potential desync situation?

Hypothetically, Player A has worse CPU, and has managed to execute 300 tasks of varying workloads in the 1-2ms of time. Player B with a better CPU managed to handle double the amount in the same time. I think there would be problems with this method.

Final solution I've come up with also doesn't seem ideal to me, to hook every function in _G to track when it's being called and have the task processor count function calls to determine if the next coroutine.pause will be instantly resumed or paused for the next cycle. But I think that adds a decent amount of overhead to Lua runtime as a whole?

The only workaround to this issue is to expect the user to give same-sized tasks to the processor, but I think that's unrealistic if there's gonna be systems in the future that rely on this resource, with varying workloads.

tldr: I think Coroutine-based API will still have to provide some metric for TaskProcessor's decision making.

Edit: I just realized os.time doesn't return milliseconds, so that idea is out of the question :D
 

Dr Super Good

Spell Reviewer
Level 63
Joined
Jan 18, 2005
Messages
27,188
If I am understanding correctly the purpose of this tool is about limiting execution time of the Lua VM? If this is the case you might want to explain and elaborate this in the main topic.

Typically Lua does not need a scheduler because Warcraft III will complete all scheduled tasks before advancing game state. Since game progress is effectively paused until this is done, all tasks will end up executed in a seemingly fair way as standard. Pre-emption is not required either since task execution costs effectively no game time so you can wait until the currently scheduled task completes.
 
Top