- Joined
- Jun 30, 2017
- Messages
- 50
Inspired by Chopinski's Relativistic Missile System way of processing missiles, I've come up with an idea of mimicking OS's process scheduling inside Lua's runtime for the purpose of eliminating "processing" lag by sacrificing the fact that the code will always execute immediately upon call.
At the moment it doesn't look as impressive as it sounds. There's no complicated scheduling strategy or task priority implemented. For now I've opted for Javascript's style of asynchronous code handling similar to Promises, by using Rx.lua.
TimerQueue & Stopwatch
Definitive Doubly-Linked List
Using information from this thread and the fact that the game in JASS VM can run up to 1666666 executions per second. I've set an arbitrary limit on how many operations all processor instances can do in a single game tick.
FUNCTION_OP_COUNT here is a placeholder and must be calculated by the developer and is used by the TaskProcessor to determine how many tasks it can fit in 1 execution bucket.
Each function is bound to have a different OP_COUNT, while you can set it as a constant to be used everywhere, it probably won't be optimal.
Here's an example of converting a simple code into a more complicated one.
This is still an unfinished resource as it's missing features like:
Also need a list of op-counts for natives.
Attached to this post is a heavily modified version of Chopinski's Missile System that uses this library as a proof of concept, since there's no apparent difference in-game between the original system and this one.
At the moment it doesn't look as impressive as it sounds. There's no complicated scheduling strategy or task priority implemented. For now I've opted for Javascript's style of asynchronous code handling similar to Promises, by using Rx.lua.
Lua:
-- RxLua v0.0.3
-- https://github.com/bjornbytes/rxlua
-- MIT License
-- changed annotations to emmy script and potentially fixed a few stuffs.
do
local util = {}
util.pack = table.pack or function(...) return { n = select('#', ...), ... } end
---@diagnostic disable-next-line: deprecated
util.unpack = table.unpack or unpack
util.eq = function(x, y) return x == y end
util.noop = function() end
util.identity = function(x) return x end
util.constant = function(x) return function() return x end end
util.isa = function(object, class)
return type(object) == 'table' and getmetatable(object).__index == class
end
util.tryWithObserver = function(observer, fn, ...)
local success, result = pcall(fn, ...)
if not success then
observer:onError(result)
end
return success, result
end
--- @class Subscription
--- @field action function
--- @field unsubscribed boolean
-- A handle representing the link between an Observer and an Observable, as well as any
-- work required to clean up after the Observable completes or the Observer unsubscribes.
Subscription = {}
Subscription.__index = Subscription
Subscription.__tostring = util.constant('Subscription')
-- Creates a new Subscription.
---@param action fun(subscription: Subscription) action - The action to run when the subscription is unsubscribed. It will only be run once.
---@return Subscription
function Subscription.create(action)
local self = {
action = action or util.noop,
unsubscribed = false
}
return setmetatable(self, Subscription)
end
--- Unsubscribes the subscription, performing any necessary cleanup work.
function Subscription:unsubscribe()
if self.unsubscribed then return end
self.action(self)
self.unsubscribed = true
end
--- @class Observer
--- @field _onNext fun(values: ...)
--- @field _onError fun(message: any, level: integer?)
--- @field _onCompleted fun()
--- @field stopped boolean
-- Observers are simple objects that receive values from Observables.
Observer = {}
Observer.__index = Observer
Observer.__tostring = util.constant('Observer')
--- Creates a new Observer.
--- @param onNext fun(...)? Called when the Observable produces a value.
--- @param onError fun(message: string)? Called when the Observable terminates due to an error.
--- @param onCompleted fun()? Called when the Observable completes normally.
--- @return Observer
function Observer.create(onNext, onError, onCompleted)
local self = {
_onNext = onNext or util.noop,
_onError = onError or error,
_onCompleted = onCompleted or util.noop,
stopped = false
}
return setmetatable(self, Observer)
end
--- Pushes zero or more values to the Observer.
--- @param ... any
function Observer:onNext(...)
if not self.stopped then
self._onNext(...)
end
end
--- Notify the Observer that an error has occurred.
--- @param message string? A string describing what went wrong.
function Observer:onError(message)
if not self.stopped then
self.stopped = true
self._onError(message)
end
end
--- Notify the Observer that the sequence has completed and will produce no more values.
function Observer:onCompleted()
if not self.stopped then
self.stopped = true
self._onCompleted()
end
end
--- @class Observable
--- @field _subscribe function
-- Observables push values to Observers.
Observable = {}
Observable.__index = Observable
Observable.__tostring = util.constant('Observable')
--- Creates a new Observable.
--- @param subscribe fun(observer: Observer): any? The subscription function that produces values.
--- @return Observable
function Observable.create(subscribe)
local self = {
_subscribe = subscribe
}
return setmetatable(self, Observable)
end
--- Shorthand for creating an Observer and passing it to this Observable's subscription function.
--- @param onNext table|fun(...)? Called when the Observable produces a value.
--- @param onError fun(message: string)? Called when the Observable terminates due to an error.
--- @param onCompleted fun()? Called when the Observable completes normally.
function Observable:subscribe(onNext, onError, onCompleted)
if type(onNext) == 'table' then
return self._subscribe(onNext)
else
return self._subscribe(Observer.create(onNext, onError, onCompleted))
end
end
--- Returns an Observable that immediately completes without producing a value.
function Observable.empty()
---@param observer Observer
return Observable.create(function(observer)
observer:onCompleted()
end)
end
--- Returns an Observable that never produces values and never completes.
function Observable.never()
return Observable.create(function(observer) end)
end
--- Returns an Observable that immediately produces an error.
--- @param message string
function Observable.throw(message)
--- @param observer Observer
return Observable.create(function(observer)
observer:onError(message)
end)
end
--- Creates an Observable that produces a set of values.
--- @param ... any
--- @return Observable
function Observable.of(...)
local args = {...}
local argCount = select('#', ...)
return Observable.create(function(observer)
for i = 1, argCount do
observer:onNext(args[i])
end
observer:onCompleted()
end)
end
--- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
--- @param initial integer The first value of the range, or the upper limit if no other arguments are specified.
--- @param limit integer The second value of the range.
--- @param step integer An amount to increment the value by each iteration.
--- @return Observable
function Observable.fromRange(initial, limit, step)
if not limit and not step then
initial, limit = 1, initial
end
step = step or 1
return Observable.create(function(observer)
for i = initial, limit, step do
observer:onNext(i)
end
observer:onCompleted()
end)
end
--- Creates an Observable that produces values from a table.
--- @generic K, V
--- @param t table The table used to create the Observable.
--- @param iterator fun(table: table<K, V>, index?: K):K, V An iterator used to iterate the table, e.g. pairs or ipairs.
--- @param keys boolean Whether or not to also emit the keys of the table.
--- @return Observable
function Observable.fromTable(t, iterator, keys)
iterator = iterator or pairs
---@param observer Observer
return Observable.create(function(observer)
for key, value in iterator(t) do
observer:onNext(value, keys and key or nil)
end
observer:onCompleted()
end)
end
--- Creates an Observable that produces values when the specified coroutine yields.
--- @param fn thread|function A coroutine or function to use to generate values. Note that if a coroutine is used, the values it yields will be shared by all subscribed Observers (influenced by the Scheduler), whereas a new coroutine will be created for each Observer when a function is used.
--- @return Observable
function Observable.fromCoroutine(fn, scheduler)
return Observable.create(function(observer)
local thread = type(fn) == 'function' and coroutine.create(fn) or fn
return scheduler:schedule(function()
while not observer.stopped do
local success, value = coroutine.resume(thread --[[@as thread]])
if success then
observer:onNext(value)
else
return observer:onError(value)
end
if coroutine.status(thread --[[@as thread]]) == 'dead' then
return observer:onCompleted()
end
coroutine.yield()
end
end)
end)
end
-- this is banned!
----- Creates an Observable that produces values from a file, line by line.
----- @param filename string The name of the file used to create the Observable
----- @return Observable
-- function Observable.fromFileByLine(filename)
-- return Observable.create(function(observer)
-- local file = io.open(filename, 'r')
-- if file then
-- file:close()
-- for line in io.lines(filename) do
-- observer:onNext(line)
-- end
-- return observer:onCompleted()
-- else
-- return observer:onError(filename)
-- end
-- end)
-- end
--- Creates an Observable that creates a new Observable for each observer using a factory function.
--- @param fn fun():Observable A function that returns an Observable.
--- @return Observable
function Observable.defer(fn)
if not fn or type(fn) ~= 'function' then
error('Expected a function')
end
return setmetatable({
subscribe = function(_, ...)
local observable = fn()
return observable:subscribe(...)
end
}, Observable)
end
--- Returns an Observable that repeats a value a specified number of times.
--- @param value any - The value to repeat.
--- @param count number - The number of times to repeat the value. If left unspecified, the value is repeated an infinite number of times.
--- @return Observable
function Observable.replicate(value, count)
return Observable.create(function(observer)
while count == nil or count > 0 do
observer:onNext(value)
if count then
count = count - 1
end
end
observer:onCompleted()
end)
end
--- Subscribes to this Observable and prints values it produces.
--- @param name string - Prefixes the printed messages with a name.
--- @param formatter function - A function that formats one or more values to be printed. Default is toString
function Observable:dump(name, formatter)
name = name and (name .. ' ') or ''
formatter = formatter or tostring
local onNext = function(...) print(name .. 'onNext: ' .. formatter(...)) end
local onError = function(e) print(name .. 'onError: ' .. e) end
local onCompleted = function() print(name .. 'onCompleted') end
return self:subscribe(onNext, onError, onCompleted)
end
--- Determine whether all items emitted by an Observable meet some criteria.
--- @param predicate fun(x: any): any - The predicate used to evaluate objects. Default is util.identity
function Observable:all(predicate)
predicate = predicate or util.identity
return Observable.create(function(observer)
local function onNext(...)
util.tryWithObserver(observer, function(...)
if not predicate(...) then
observer:onNext(false)
observer:onCompleted()
end
end, ...)
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
observer:onNext(true)
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Given a set of Observables, produces values from only the first one to produce a value.
---@param a Observable
---@param b Observable
---@param ... Observable
---@return Observable
function Observable.amb(a, b, ...)
if not a or not b then return a end
return Observable.create(function(observer)
local subscriptionA, subscriptionB
local function onNextA(...)
if subscriptionB then subscriptionB:unsubscribe() end
observer:onNext(...)
end
local function onErrorA(e)
if subscriptionB then subscriptionB:unsubscribe() end
observer:onError(e)
end
local function onCompletedA()
if subscriptionB then subscriptionB:unsubscribe() end
observer:onCompleted()
end
local function onNextB(...)
if subscriptionA then subscriptionA:unsubscribe() end
observer:onNext(...)
end
local function onErrorB(e)
if subscriptionA then subscriptionA:unsubscribe() end
observer:onError(e)
end
local function onCompletedB()
if subscriptionA then subscriptionA:unsubscribe() end
observer:onCompleted()
end
subscriptionA = a:subscribe(onNextA, onErrorA, onCompletedA)
subscriptionB = b:subscribe(onNextB, onErrorB, onCompletedB)
return Subscription.create(function()
subscriptionA:unsubscribe()
subscriptionB:unsubscribe()
end)
end):amb(...)
end
--- Returns an Observable that produces the average of all values produced by the original.
--- @return Observable
function Observable:average()
return Observable.create(function(observer)
local sum, count = 0, 0
local function onNext(value)
sum = sum + value
count = count + 1
end
local function onError(e)
observer:onError(e)
end
local function onCompleted()
if count > 0 then
observer:onNext(sum / count)
end
observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that buffers values from the original and produces them as multiple
-- values.
--- @param size number - The size of the buffer.
function Observable:buffer(size)
if not size or type(size) ~= 'number' then
error('Expected a number')
end
return Observable.create(function(observer)
local buffer = {}
local function emit()
if #buffer > 0 then
observer:onNext(util.unpack(buffer))
buffer = {}
end
end
local function onNext(...)
local values = {...}
for i = 1, #values do
table.insert(buffer, values[i])
if #buffer >= size then
emit()
end
end
end
local function onError(message)
emit()
return observer:onError(message)
end
local function onCompleted()
emit()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that intercepts any errors from the previous and replace them with values
-- produced by a new Observable.
--- @param handler function|Observable - An Observable or a function that returns an Observable to replace the source Observable in the event of an error.
--- @return Observable
function Observable:catch(handler)
handler = handler and (type(handler) == 'function' and handler or util.constant(handler))
return Observable.create(function(observer)
local subscription
local function onNext(...)
return observer:onNext(...)
end
local function onError(e)
if not handler then
return observer:onCompleted()
end
local success, _continue = pcall(handler, e)
if success and _continue then
if subscription then subscription:unsubscribe() end
_continue:subscribe(observer)
else
observer:onError(success and e or _continue)
end
end
local function onCompleted()
observer:onCompleted()
end
subscription = self:subscribe(onNext, onError, onCompleted)
return subscription
end)
end
--- Returns a new Observable that runs a combinator function on the most recent values from a set
-- of Observables whenever any of them produce a new value. The results of the combinator function
-- are produced by the new Observable.
--- @param ... Observable - One or more Observables to combine.
--- @param ... function - A function that combines the latest result from each Observable and returns a single value.
--- @return Observable
function Observable:combineLatest(...)
local sources = {...}
local combinator = table.remove(sources)
if type(combinator) ~= 'function' then
table.insert(sources, combinator)
combinator = function(...) return ... end
end
table.insert(sources, 1, self)
return Observable.create(function(observer)
local latest = {}
local pending = {util.unpack(sources)}
local completed = {}
local subscription = {}
local function onNext(i)
return function(value)
latest[i] = value
pending[i] = nil
if not next(pending) then
util.tryWithObserver(observer, function()
observer:onNext(combinator(util.unpack(latest)))
end)
end
end
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted(i)
return function()
table.insert(completed, i)
if #completed == #sources then
observer:onCompleted()
end
end
end
for i = 1, #sources do
subscription[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i))
end
return Subscription.create(function ()
for i = 1, #sources do
if subscription[i] then subscription[i]:unsubscribe() end
end
end)
end)
end
--- Returns a new Observable that produces the values of the first with falsy values removed.
--- @return Observable
function Observable:compact()
return self:filter(util.identity)
end
--- Returns a new Observable that produces the values produced by all the specified Observables in
-- the order they are specified.
--- @param other Observable - Observable to concatenate.
--- @param ... Observable - The Observables to concatenate.
--- @return Observable
function Observable:concat(other, ...)
if not other then return self end
local others = {...}
return Observable.create(function(observer)
local function onNext(...)
return observer:onNext(...)
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
local function chain()
return other:concat(util.unpack(others)):subscribe(onNext, onError, onCompleted)
end
return self:subscribe(onNext, onError, chain)
end)
end
--- Returns a new Observable that produces a single boolean value representing whether or not the
-- specified value was produced by the original.
--- @param value any - The value to search for. == is used for equality testing.
--- @return Observable
function Observable:contains(value)
return Observable.create(function(observer)
local subscription
local function onNext(...)
local args = util.pack(...)
if #args == 0 and value == nil then
observer:onNext(true)
if subscription then subscription:unsubscribe() end
return observer:onCompleted()
end
for i = 1, #args do
if args[i] == value then
observer:onNext(true)
if subscription then subscription:unsubscribe() end
return observer:onCompleted()
end
end
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
observer:onNext(false)
return observer:onCompleted()
end
subscription = self:subscribe(onNext, onError, onCompleted)
return subscription
end)
end
--- Returns an Observable that produces a single value representing the number of values produced
-- by the source value that satisfy an optional predicate.
--- @param predicate function - The predicate used to match values.
function Observable:count(predicate)
predicate = predicate or util.constant(true)
return Observable.create(function(observer)
local count = 0
local function onNext(...)
util.tryWithObserver(observer, function(...)
if predicate(...) then
count = count + 1
end
end, ...)
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
observer:onNext(count)
observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new throttled Observable that waits to produce values until a timeout has expired, at
-- which point it produces the latest value from the source Observable. Whenever the source
-- Observable produces a value, the timeout is reset.
--- @param time number - An amount in milliseconds to wait before producing the last value.
--- @param scheduler Scheduler - The scheduler to run the Observable on.
--- @return Observable
function Observable:debounce(time, scheduler)
time = time or 0
return Observable.create(function(observer)
local debounced = {}
local function wrap(key)
return function(...)
local value = util.pack(...)
if debounced[key] then
debounced[key]:unsubscribe()
end
local values = util.pack(...)
debounced[key] = scheduler:schedule(function()
return observer[key](observer, util.unpack(values))
end, time)
end
end
local subscription = self:subscribe(wrap('onNext'), wrap('onError'), wrap('onCompleted'))
return Subscription.create(function()
if subscription then subscription:unsubscribe() end
for _, timeout in pairs(debounced) do
timeout:unsubscribe()
end
end)
end)
end
--- Returns a new Observable that produces a default set of items if the source Observable produces
-- no values.
--- @param ... any - Zero or more values to produce if the source completes without emitting anything.
--- @return Observable
function Observable:defaultIfEmpty(...)
local defaults = util.pack(...)
return Observable.create(function(observer)
local hasValue = false
local function onNext(...)
hasValue = true
observer:onNext(...)
end
local function onError(e)
observer:onError(e)
end
local function onCompleted()
if not hasValue then
observer:onNext(util.unpack(defaults))
end
observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that produces the values of the original delayed by a time period.
--- @param time number|function - An amount in milliseconds to delay by, or a function which returns this value.
--- @param scheduler Scheduler - The scheduler to run the Observable on.
--- @return Observable
function Observable:delay(time, scheduler)
time = type(time) ~= 'function' and util.constant(time) or time
return Observable.create(function(observer)
local actions = {}
local function delay(key)
return function(...)
local arg = util.pack(...)
local handle = scheduler:schedule(function()
observer[key](observer, util.unpack(arg))
end, time())
table.insert(actions, handle)
end
end
local subscription = self:subscribe(delay('onNext'), delay('onError'), delay('onCompleted'))
return Subscription.create(function()
if subscription then subscription:unsubscribe() end
for i = 1, #actions do
actions[i]:unsubscribe()
end
end)
end)
end
--- Returns a new Observable that produces the values from the original with duplicates removed.
--- @return Observable
function Observable:distinct()
return Observable.create(function(observer)
local values = {}
local function onNext(x)
if not values[x] then
observer:onNext(x)
end
values[x] = true
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that only produces values from the original if they are different from
-- the previous value.
--- @param comparator function - A function used to compare 2 values. If unspecified, == is used.
--- @return Observable
function Observable:distinctUntilChanged(comparator)
comparator = comparator or util.eq
return Observable.create(function(observer)
local first = true
local currentValue = nil
local function onNext(value, ...)
local values = util.pack(...)
util.tryWithObserver(observer, function()
if first or not comparator(value, currentValue) then
observer:onNext(value, util.unpack(values))
currentValue = value
first = false
end
end)
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that produces the nth element produced by the source Observable.
--- @param index number - The index of the item, with an index of 1 representing the first.
--- @return Observable
function Observable:elementAt(index)
if not index or type(index) ~= 'number' then
error('Expected a number')
end
return Observable.create(function(observer)
local subscription
local i = 1
local function onNext(...)
if i == index then
observer:onNext(...)
observer:onCompleted()
if subscription then
subscription:unsubscribe()
end
else
i = i + 1
end
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
subscription = self:subscribe(onNext, onError, onCompleted)
return subscription
end)
end
--- Returns a new Observable that only produces values of the first that satisfy a predicate.
--- @param predicate function - The predicate used to filter values.
--- @return Observable
function Observable:filter(predicate)
predicate = predicate or util.identity
return Observable.create(function(observer)
local function onNext(...)
util.tryWithObserver(observer, function(...)
if predicate(...) then
return observer:onNext(...)
end
end, ...)
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that produces the first value of the original that satisfies a
-- predicate.
--- @param predicate function - The predicate used to find a value.
function Observable:find(predicate)
predicate = predicate or util.identity
return Observable.create(function(observer)
local function onNext(...)
util.tryWithObserver(observer, function(...)
if predicate(...) then
observer:onNext(...)
return observer:onCompleted()
end
end, ...)
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that only produces the first result of the original.
--- @return Observable
function Observable:first()
return self:take(1)
end
--- Returns a new Observable that transform the items emitted by an Observable into Observables,
-- then flatten the emissions from those into a single Observable
--- @param callback function - The function to transform values from the original Observable.
--- @return Observable
function Observable:flatMap(callback)
callback = callback or util.identity
return self:map(callback):flatten()
end
--- Returns a new Observable that uses a callback to create Observables from the values produced by
-- the source, then produces values from the most recent of these Observables.
--- @param callback function? - The function used to convert values to Observables. By default util.identity
--- @return Observable
function Observable:flatMapLatest(callback)
callback = callback or util.identity
return Observable.create(function(observer)
local innerSubscription
local function onNext(...)
observer:onNext(...)
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
local function subscribeInner(...)
if innerSubscription then
innerSubscription:unsubscribe()
end
return util.tryWithObserver(observer, function(...)
innerSubscription = callback(...):subscribe(onNext, onError)
end, ...)
end
local subscription = self:subscribe(subscribeInner, onError, onCompleted)
return Subscription.create(function()
if innerSubscription then
innerSubscription:unsubscribe()
end
if subscription then
subscription:unsubscribe()
end
end)
end)
end
--- Returns a new Observable that subscribes to the Observables produced by the original and
-- produces their values.
--- @return Observable
function Observable:flatten()
return Observable.create(function(observer)
local subscriptions = {}
local remaining = 1
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
remaining = remaining - 1
if remaining == 0 then
return observer:onCompleted()
end
end
local function onNext(observable)
local function innerOnNext(...)
observer:onNext(...)
end
remaining = remaining + 1
local subscription = observable:subscribe(innerOnNext, onError, onCompleted)
subscriptions[#subscriptions + 1] = subscription
end
subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
return Subscription.create(function ()
for i = 1, #subscriptions do
subscriptions[i]:unsubscribe()
end
end)
end)
end
--- Returns an Observable that terminates when the source terminates but does not produce any
-- elements.
--- @return Observable
function Observable:ignoreElements()
return Observable.create(function(observer)
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(nil, onError, onCompleted)
end)
end
--- Returns a new Observable that only produces the last result of the original.
--- @return Observable
function Observable:last()
return Observable.create(function(observer)
local value
local empty = true
local function onNext(...)
value = {...}
empty = false
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
if not empty then
observer:onNext(util.unpack(value or {}))
end
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that produces the values of the original transformed by a function.
--- @param callback function - The function to transform values from the original Observable.
--- @return Observable
function Observable:map(callback)
return Observable.create(function(observer)
callback = callback or util.identity
local function onNext(...)
return util.tryWithObserver(observer, function(...)
return observer:onNext(callback(...))
end, ...)
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that produces the maximum value produced by the original.
--- @return Observable
function Observable:max()
return self:reduce(math.max)
end
--- Returns a new Observable that produces the values produced by all the specified Observables in
-- the order they are produced.
--- @param ... Observable - One or more Observables to merge.
--- @return Observable
function Observable:merge(...)
local sources = {...}
table.insert(sources, 1, self)
return Observable.create(function(observer)
local completed = {}
local subscriptions = {}
local function onNext(...)
return observer:onNext(...)
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted(i)
return function()
table.insert(completed, i)
if #completed == #sources then
observer:onCompleted()
end
end
end
for i = 1, #sources do
subscriptions[i] = sources[i]:subscribe(onNext, onError, onCompleted(i))
end
return Subscription.create(function ()
for i = 1, #sources do
if subscriptions[i] then subscriptions[i]:unsubscribe() end
end
end)
end)
end
--- Returns a new Observable that produces the minimum value produced by the original.
--- @return Observable
function Observable:min()
return self:reduce(math.min)
end
--- Returns an Observable that produces the values of the original inside tables.
--- @return Observable
function Observable:pack()
return self:map(util.pack)
end
--- Returns two Observables: one that produces values for which the predicate returns truthy for,
-- and another that produces values for which the predicate returns falsy.
--- @param predicate function - The predicate used to partition the values.
--- @return Observable, Observable
function Observable:partition(predicate)
return self:filter(predicate), self:reject(predicate)
end
--- Returns a new Observable that produces values computed by extracting the given keys from the
-- tables produced by the original.
--- @param ... string - The key to extract from the table. Multiple keys can be specified to recursively pluck values from nested tables.
--- @return Observable
function Observable:pluck(key, ...)
if not key then return self end
if type(key) ~= 'string' and type(key) ~= 'number' then
return Observable.throw('pluck key must be a string')
end
return Observable.create(function(observer)
local function onNext(t)
return observer:onNext(t[key])
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end):pluck(...)
end
--- Returns a new Observable that produces a single value computed by accumulating the results of
-- running a function on each value produced by the original Observable.
--- @param accumulator function - Accumulates the values of the original Observable. Will be passed the return value of the last call as the first argument and the current values as the rest of the arguments.
--- @param seed any - A value to pass to the accumulator the first time it is run.
--- @return Observable
function Observable:reduce(accumulator, seed)
return Observable.create(function(observer)
local result = seed
local first = true
local function onNext(...)
if first and seed == nil then
result = ...
first = false
else
return util.tryWithObserver(observer, function(...)
result = accumulator(result, ...)
end, ...)
end
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
observer:onNext(result)
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that produces values from the original which do not satisfy a
-- predicate.
--- @param predicate function - The predicate used to reject values.
--- @return Observable
function Observable:reject(predicate)
predicate = predicate or util.identity
return Observable.create(function(observer)
local function onNext(...)
util.tryWithObserver(observer, function(...)
if not predicate(...) then
return observer:onNext(...)
end
end, ...)
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that restarts in the event of an error.
--- @param count number - The maximum number of times to retry. If left unspecified, an infinite number of retries will be attempted.
--- @return Observable
function Observable:retry(count)
return Observable.create(function(observer)
local subscription
local retries = 0
local function onNext(...)
return observer:onNext(...)
end
local function onCompleted()
return observer:onCompleted()
end
local function onError(message)
if subscription then
subscription:unsubscribe()
end
retries = retries + 1
if count and retries > count then
return observer:onError(message)
end
subscription = self:subscribe(onNext, onError, onCompleted)
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that produces its most recent value every time the specified observable
-- produces a value.
--- @param sampler Observable - The Observable that is used to sample values from this Observable.
--- @return Observable
function Observable:sample(sampler)
if not sampler then error('Expected an Observable') end
return Observable.create(function(observer)
local latest = {}
local function setLatest(...)
latest = util.pack(...)
end
local function onNext()
if #latest > 0 then
return observer:onNext(util.unpack(latest))
end
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
local sourceSubscription = self:subscribe(setLatest, onError)
local sampleSubscription = sampler:subscribe(onNext, onError, onCompleted)
return Subscription.create(function()
if sourceSubscription then sourceSubscription:unsubscribe() end
if sampleSubscription then sampleSubscription:unsubscribe() end
end)
end)
end
--- Returns a new Observable that produces values computed by accumulating the results of running a
-- function on each value produced by the original Observable.
--- @param accumulator function - Accumulates the values of the original Observable. Will be passed the return value of the last call as the first argument and the current values as the rest of the arguments. Each value returned from this function will be emitted by the Observable.
--- @param seed any - A value to pass to the accumulator the first time it is run.
--- @return Observable
function Observable:scan(accumulator, seed)
return Observable.create(function(observer)
local result = seed
local first = true
local function onNext(...)
if first and seed == nil then
result = ...
first = false
else
return util.tryWithObserver(observer, function(...)
result = accumulator(result, ...)
observer:onNext(result)
end, ...)
end
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that skips over a specified number of values produced by the original
-- and produces the rest.
--- @param n number - The number of values to ignore.
--- @return Observable
function Observable:skip(n)
n = n or 1
return Observable.create(function(observer)
local i = 1
local function onNext(...)
if i > n then
observer:onNext(...)
else
i = i + 1
end
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that omits a specified number of values from the end of the original
-- Observable.
--- @param count number - The number of items to omit from the end.
--- @return Observable
function Observable:skipLast(count)
if not count or type(count) ~= 'number' then
error('Expected a number')
end
local buffer = {}
return Observable.create(function(observer)
local function emit()
if #buffer > count and buffer[1] then
local values = table.remove(buffer, 1)
observer:onNext(util.unpack(values))
end
end
local function onNext(...)
emit()
table.insert(buffer, util.pack(...))
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
emit()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that skips over values produced by the original until the specified
-- Observable produces a value.
--- @param other Observable - The Observable that triggers the production of values.
--- @return Observable
function Observable:skipUntil(other)
return Observable.create(function(observer)
local triggered = false
local function trigger()
triggered = true
end
other:subscribe(trigger, trigger, trigger)
local function onNext(...)
if triggered then
observer:onNext(...)
end
end
local function onError()
if triggered then
observer:onError()
end
end
local function onCompleted()
if triggered then
observer:onCompleted()
end
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that skips elements until the predicate returns falsy for one of them.
--- @param predicate function - The predicate used to continue skipping values.
--- @return Observable
function Observable:skipWhile(predicate)
predicate = predicate or util.identity
return Observable.create(function(observer)
local skipping = true
local function onNext(...)
if skipping then
util.tryWithObserver(observer, function(...)
skipping = predicate(...)
end, ...)
end
if not skipping then
return observer:onNext(...)
end
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that produces the specified values followed by all elements produced by
-- the source Observable.
--- @param ... any - The values to produce before the Observable begins producing values normally.
--- @return Observable
function Observable:startWith(...)
local values = util.pack(...)
return Observable.create(function(observer)
observer:onNext(util.unpack(values))
return self:subscribe(observer)
end)
end
--- Returns an Observable that produces a single value representing the sum of the values produced
-- by the original.
--- @return Observable
function Observable:sum()
return self:reduce(function(x, y) return x + y end, 0)
end
--- Given an Observable that produces Observables, returns an Observable that produces the values
-- produced by the most recently produced Observable.
--- @return Observable
function Observable:switch()
return Observable.create(function(observer)
local innerSubscription
local function onNext(...)
return observer:onNext(...)
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
local function switch(source)
if innerSubscription then
innerSubscription:unsubscribe()
end
innerSubscription = source:subscribe(onNext, onError, nil)
end
local subscription = self:subscribe(switch, onError, onCompleted)
return Subscription.create(function()
if innerSubscription then
innerSubscription:unsubscribe()
end
if subscription then
subscription:unsubscribe()
end
end)
end)
end
--- Returns a new Observable that only produces the first n results of the original.
--- @param n number - The number of elements to produce before completing.
--- @return Observable
function Observable:take(n)
n = n or 1
return Observable.create(function(observer)
if n <= 0 then
observer:onCompleted()
return
end
local i = 1
local function onNext(...)
observer:onNext(...)
i = i + 1
if i > n then
observer:onCompleted()
end
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that produces a specified number of elements from the end of a source
-- Observable.
--- @param count number - The number of elements to produce.
--- @return Observable
function Observable:takeLast(count)
if not count or type(count) ~= 'number' then
error('Expected a number')
end
return Observable.create(function(observer)
local buffer = {}
local function onNext(...)
table.insert(buffer, util.pack(...))
if #buffer > count then
table.remove(buffer, 1)
end
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
for i = 1, #buffer do
observer:onNext(util.unpack(buffer[i]))
end
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that completes when the specified Observable fires.
--- @param other Observable - The Observable that triggers completion of the original.
--- @return Observable
function Observable:takeUntil(other)
return Observable.create(function(observer)
local function onNext(...)
return observer:onNext(...)
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
other:subscribe(onCompleted, onCompleted, onCompleted)
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that produces elements until the predicate returns falsy.
--- @param predicate function - The predicate used to continue production of values.
--- @return Observable
function Observable:takeWhile(predicate)
predicate = predicate or util.identity
return Observable.create(function(observer)
local taking = true
local function onNext(...)
if taking then
util.tryWithObserver(observer, function(...)
taking = predicate(...)
end, ...)
if taking then
return observer:onNext(...)
else
return observer:onCompleted()
end
end
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Runs a function each time this Observable has activity. Similar to subscribe but does not
-- create a subscription.
--- @param _onNext fun(...)? - Run when the Observable produces values.
--- @param _onError fun(message:string)? - Run when the Observable encounters a problem.
--- @param _onCompleted function? - Run when the Observable completes.
--- @return Observable
function Observable:tap(_onNext, _onError, _onCompleted)
_onNext = _onNext or util.noop
_onError = _onError or util.noop
_onCompleted = _onCompleted or util.noop
return Observable.create(function(observer)
local function onNext(...)
util.tryWithObserver(observer, function(...)
_onNext(...)
end, ...)
return observer:onNext(...)
end
local function onError(message)
util.tryWithObserver(observer, function()
_onError(message)
end)
return observer:onError(message)
end
local function onCompleted()
util.tryWithObserver(observer, function()
_onCompleted()
end)
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that unpacks the tables produced by the original.
--- @return Observable
function Observable:unpack()
return self:map(util.unpack)
end
--- Returns an Observable that takes any values produced by the original that consist of multiple
-- return values and produces each value individually.
--- @return Observable
function Observable:unwrap()
return Observable.create(function(observer)
local function onNext(...)
local values = {...}
for i = 1, #values do
observer:onNext(values[i])
end
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that produces a sliding window of the values produced by the original.
--- @param size number - The size of the window. The returned observable will produce this number of the most recent values as multiple arguments to onNext.
--- @return Observable
function Observable:window(size)
if not size or type(size) ~= 'number' then
error('Expected a number')
end
return Observable.create(function(observer)
local window = {}
local function onNext(value)
table.insert(window, value)
if #window >= size then
observer:onNext(util.unpack(window))
table.remove(window, 1)
end
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that produces values from the original along with the most recently
-- produced value from all other specified Observables. Note that only the first argument from each
-- source Observable is used.
--- @param ... Observable - The Observables to include the most recent values from.
--- @return Observable
function Observable:with(...)
local sources = {...}
return Observable.create(function(observer)
local latest = setmetatable({}, {__len = util.constant(#sources)})
local subscriptions = {}
local function setLatest(i)
return function(value)
latest[i] = value
end
end
local function onNext(value)
return observer:onNext(value, util.unpack(latest))
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
for i = 1, #sources do
subscriptions[i] = sources[i]:subscribe(setLatest(i), util.noop, util.noop)
end
subscriptions[#sources + 1] = self:subscribe(onNext, onError, onCompleted)
return Subscription.create(function ()
for i = 1, #sources + 1 do
if subscriptions[i] then subscriptions[i]:unsubscribe() end
end
end)
end)
end
--- Returns an Observable that merges the values produced by the source Observables by grouping them
-- by their index. The first onNext event contains the first value of all of the sources, the
-- second onNext event contains the second value of all of the sources, and so on. onNext is called
-- a number of times equal to the number of values produced by the Observable that produces the
-- fewest number of values.
--- @param ... Observable - The Observables to zip.
--- @return Observable
function Observable.zip(...)
local sources = util.pack(...)
local count = #sources
return Observable.create(function(observer)
local values = {}
local active = {}
local subscriptions = {}
for i = 1, count do
values[i] = {n = 0}
active[i] = true
end
local function onNext(i)
return function(value)
table.insert(values[i], value)
values[i].n = values[i].n + 1
local ready = true
for i = 1, count do
if values[i].n == 0 then
ready = false
break
end
end
if ready then
local payload = {}
for i = 1, count do
payload[i] = table.remove(values[i], 1)
values[i].n = values[i].n - 1
end
observer:onNext(util.unpack(payload))
end
end
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted(i)
return function()
active[i] = nil
if not next(active) or values[i].n == 0 then
return observer:onCompleted()
end
end
end
for i = 1, count do
subscriptions[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i))
end
return Subscription.create(function()
for i = 1, count do
if subscriptions[i] then subscriptions[i]:unsubscribe() end
end
end)
end)
end
--- @class ImmediateScheduler
-- @description Schedules Observables by running all operations immediately.
ImmediateScheduler = {}
ImmediateScheduler.__index = ImmediateScheduler
ImmediateScheduler.__tostring = util.constant('ImmediateScheduler')
--- Creates a new ImmediateScheduler.
--- @return ImmediateScheduler}
function ImmediateScheduler.create()
return setmetatable({}, ImmediateScheduler)
end
--- Schedules a function to be run on the scheduler. It is executed immediately.
--- @param action function - The function to execute.
function ImmediateScheduler:schedule(action)
action()
end
--- @class CooperativeScheduler
--- @field tasks {thread: thread, due: number}[]
-- @description Manages Observables using coroutines and a virtual clock that must be updated
-- manually.
CooperativeScheduler = {}
CooperativeScheduler.__index = CooperativeScheduler
CooperativeScheduler.__tostring = util.constant('CooperativeScheduler')
--- Creates a new CooperativeScheduler.
--- @param currentTime number - A time to start the scheduler at.
--- @return CooperativeScheduler
function CooperativeScheduler.create(currentTime)
local self = {
tasks = {},
currentTime = currentTime or 0
}
return setmetatable(self, CooperativeScheduler)
end
--- Schedules a function to be run after an optional delay. Returns a subscription that will stop
-- the action from running.
--- @param action function - The function to execute. Will be converted into a coroutine. The coroutine may yield execution back to the scheduler with an optional number, which will put it to sleep for a time period.
--- @param delay number - Delay execution of the action by a virtual time period.
--- @return Subscription
function CooperativeScheduler:schedule(action, delay)
local task = {
thread = coroutine.create(action),
due = self.currentTime + (delay or 0)
}
table.insert(self.tasks, task)
return Subscription.create(function()
return self:unschedule(task)
end)
end
function CooperativeScheduler:unschedule(task)
for i = 1, #self.tasks do
if self.tasks[i] == task then
table.remove(self.tasks, i)
end
end
end
--- Triggers an update of the CooperativeScheduler. The clock will be advanced and the scheduler
-- will run any coroutines that are due to be run.
--- @param delta number - An amount of time to advance the clock by. It is common to pass in the time in seconds or milliseconds elapsed since this function was last called.
function CooperativeScheduler:update(delta)
self.currentTime = self.currentTime + (delta or 0)
local i = 1
while i <= #self.tasks do
local task = self.tasks[i]
if self.currentTime >= task.due then
local success, delay = coroutine.resume(task.thread)
if coroutine.status(task.thread) == 'dead' then
table.remove(self.tasks, i)
else
task.due = math.max(task.due + (delay or 0), self.currentTime)
i = i + 1
end
if not success then
error(delay)
end
else
i = i + 1
end
end
end
--- Returns whether or not the CooperativeScheduler's queue is empty.
function CooperativeScheduler:isEmpty()
return not next(self.tasks)
end
--- @class TimeoutScheduler
-- @description A scheduler that uses luvit's timer library to schedule events on an event loop.
TimeoutScheduler = {}
TimeoutScheduler.__index = TimeoutScheduler
TimeoutScheduler.__tostring = util.constant('TimeoutScheduler')
--- Creates a new TimeoutScheduler.
--- @return TimeoutScheduler
function TimeoutScheduler.create()
return setmetatable({}, TimeoutScheduler)
end
--- Schedules an action to run at a future point in time.
--- @param action function - The action to run.
--- @param delay number - The delay, in milliseconds.
--- @return Subscription
function TimeoutScheduler:schedule(action, delay, ...)
local timer = require 'timer'
local subscription
local handle = timer.setTimeout(delay, action, ...)
return Subscription.create(function()
timer.clearTimeout(handle)
end)
end
--- @class Subject : Observer, Observable
--- @field observers Observer[]
-- Subjects function both as an Observer and as an Observable. Subjects inherit all
-- Observable functions, including subscribe. Values can also be pushed to the Subject, which will
-- be broadcasted to any subscribed Observers.
Subject = setmetatable({}, Observable)
Subject.__index = Subject
Subject.__tostring = util.constant('Subject')
--- Creates a new Subject.
--- @return Subject
function Subject.create()
return setmetatable({
observers = {},
stopped = false
}, Subject) --[[@as Subject]]
end
--- Creates a new Observer and attaches it to the Subject.
--- @param onNext fun(...)|Observer - A function called when the Subject produces a value or an existing Observer to attach to the Subject.
--- @param onError function? - Called when the Subject terminates due to an error.
--- @param onCompleted function? - Called when the Subject completes normally.
function Subject:subscribe(onNext, onError, onCompleted)
local observer
if util.isa(onNext, Observer) then
observer = onNext
else
observer = Observer.create(onNext --[[@as fun(...)]], onError, onCompleted)
end
table.insert(self.observers, observer)
return Subscription.create(function()
for i = 1, #self.observers do
if self.observers[i] == observer then
table.remove(self.observers, i)
return
end
end
end)
end
--- Pushes zero or more values to the Subject. They will be broadcasted to all Observers.
--- @param ... any
function Subject:onNext(...)
if not self.stopped then
for i = #self.observers, 1, -1 do
self.observers[i]:onNext(...)
end
end
end
--- Signal to all Observers that an error has occurred.
--- @param message string - A string describing what went wrong.
function Subject:onError(message)
if not self.stopped then
for i = #self.observers, 1, -1 do
self.observers[i]:onError(message)
end
self.stopped = true
end
end
--- Signal to all Observers that the Subject will not produce any more values.
function Subject:onCompleted()
if not self.stopped then
for i = #self.observers, 1, -1 do
self.observers[i]:onCompleted()
end
self.stopped = true
end
end
Subject.__call = Subject.onNext
--- @class AsyncSubject: Subject
-- @description AsyncSubjects are subjects that produce either no values or a single value. If
-- multiple values are produced via onNext, only the last one is used. If onError is called, then
-- no value is produced and onError is called on any subscribed Observers. If an Observer
-- subscribes and the AsyncSubject has already terminated, the Observer will immediately receive the
-- value or the error.
AsyncSubject = setmetatable({}, Observable)
AsyncSubject.__index = AsyncSubject
AsyncSubject.__tostring = util.constant('AsyncSubject')
--- Creates a new AsyncSubject.
--- @return AsyncSubject
function AsyncSubject.create()
local self = {
observers = {},
stopped = false,
value = nil,
errorMessage = nil
}
return setmetatable(self, AsyncSubject) --[[@as AsyncSubject]]
end
--- Creates a new Observer and attaches it to the AsyncSubject.
--- @param onNext fun(...) - A function called when the AsyncSubject produces a value or an existing Observer to attach to the AsyncSubject.
--- @param onError function - Called when the AsyncSubject terminates due to an error.
--- @param onCompleted function - Called when the AsyncSubject completes normally.
function AsyncSubject:subscribe(onNext, onError, onCompleted)
local observer
if util.isa(onNext, Observer) then
observer = onNext
else
observer = Observer.create(onNext, onError, onCompleted)
end
if self.value then
observer:onNext(util.unpack(self.value))
observer:onCompleted()
return
elseif self.errorMessage then
observer:onError(self.errorMessage)
return
end
table.insert(self.observers, observer)
return Subscription.create(function()
for i = 1, #self.observers do
if self.observers[i] == observer then
table.remove(self.observers, i)
return
end
end
end)
end
--- Pushes zero or more values to the AsyncSubject.
--- @param ... any
function AsyncSubject:onNext(...)
if not self.stopped then
self.value = util.pack(...)
end
end
--- Signal to all Observers that an error has occurred.
--- @param message string - A string describing what went wrong.
function AsyncSubject:onError(message)
if not self.stopped then
self.errorMessage = message
for i = 1, #self.observers do
self.observers[i]:onError(self.errorMessage)
end
self.stopped = true
end
end
--- Signal to all Observers that the AsyncSubject will not produce any more values.
function AsyncSubject:onCompleted()
if not self.stopped then
for i = 1, #self.observers do
if self.value then
self.observers[i]:onNext(util.unpack(self.value))
end
self.observers[i]:onCompleted()
end
self.stopped = true
end
end
AsyncSubject.__call = AsyncSubject.onNext
--- @class BehaviorSubject : Subject
-- @description A Subject that tracks its current value. Provides an accessor to retrieve the most
-- recent pushed value, and all subscribers immediately receive the latest value.
BehaviorSubject = setmetatable({}, Subject)
BehaviorSubject.__index = BehaviorSubject
BehaviorSubject.__tostring = util.constant('BehaviorSubject')
--- Creates a new BehaviorSubject.
--- @param ... any - The initial values.
--- @return BehaviorSubject
function BehaviorSubject.create(...)
local self = {
observers = {},
stopped = false
}
if select('#', ...) > 0 then
self.value = util.pack(...)
end
return setmetatable(self, BehaviorSubject) --[[@as BehaviorSubject]]
end
--- Creates a new Observer and attaches it to the BehaviorSubject. Immediately broadcasts the most
-- recent value to the Observer.
--- @param onNext function - Called when the BehaviorSubject produces a value.
--- @param onError function - Called when the BehaviorSubject terminates due to an error.
--- @param onCompleted function - Called when the BehaviorSubject completes normally.
function BehaviorSubject:subscribe(onNext, onError, onCompleted)
local observer
if util.isa(onNext, Observer) then
observer = onNext
else
observer = Observer.create(onNext, onError, onCompleted)
end
local subscription = Subject.subscribe(self, observer)
if self.value then
observer:onNext(util.unpack(self.value))
end
return subscription
end
--- Pushes zero or more values to the BehaviorSubject. They will be broadcasted to all Observers.
--- @param ... any
function BehaviorSubject:onNext(...)
self.value = util.pack(...)
return Subject.onNext(self, ...)
end
--- Returns the last value emitted by the BehaviorSubject, or the initial value passed to the
-- constructor if nothing has been emitted yet.
--- @return any
function BehaviorSubject:getValue()
if self.value ~= nil then
return util.unpack(self.value)
end
end
BehaviorSubject.__call = BehaviorSubject.onNext
--- @class ReplaySubject : Subject
--- @field buffer any[]
--- @field bufferSize integer
-- @description A Subject that provides new Subscribers with some or all of the most recently
-- produced values upon subscription.
ReplaySubject = setmetatable({}, Subject)
ReplaySubject.__index = ReplaySubject
ReplaySubject.__tostring = util.constant('ReplaySubject')
--- Creates a new ReplaySubject.
--- @param bufferSize number - The number of values to send to new subscribers. If nil, an infinite buffer is used (note that this could lead to memory issues).
--- @return ReplaySubject
function ReplaySubject.create(bufferSize)
local self = {
observers = {},
stopped = false,
buffer = {},
bufferSize = bufferSize
}
return setmetatable(self, ReplaySubject) --[[@as ReplaySubject]]
end
--- Creates a new Observer and attaches it to the ReplaySubject. Immediately broadcasts the most
-- contents of the buffer to the Observer.
--- @param onNext function - Called when the ReplaySubject produces a value.
--- @param onError function - Called when the ReplaySubject terminates due to an error.
--- @param onCompleted function - Called when the ReplaySubject completes normally.
function ReplaySubject:subscribe(onNext, onError, onCompleted)
local observer
if util.isa(onNext, Observer) then
observer = onNext
else
observer = Observer.create(onNext, onError, onCompleted)
end
local subscription = Subject.subscribe(self, observer)
for i = 1, #self.buffer do
observer:onNext(util.unpack(self.buffer[i]))
end
return subscription
end
--- Pushes zero or more values to the ReplaySubject. They will be broadcasted to all Observers.
--- @param ... any
function ReplaySubject:onNext(...)
table.insert(self.buffer, util.pack(...))
if self.bufferSize and #self.buffer > self.bufferSize then
table.remove(self.buffer, 1)
end
return Subject.onNext(self, ...)
end
ReplaySubject.__call = ReplaySubject.onNext
Observable.wrap = Observable.buffer
Observable['repeat'] = Observable.replicate
end
Lua:
-- Scheduler for Rx.Lua using TimerQueue & Stopwatch library
---@class Scheduler
---@field tasks Observer
local Scheduler = {}
local timerQueue
function Scheduler.create()
timerQueue = TimerQueue.create()
return Scheduler --apparently I need an instance.
end
---@param action function
---@param delay number
function Scheduler:schedule(action, delay)
return timerQueue:callDelayed(delay, action)
end
Definitive Doubly-Linked List
Lua:
if Debug then Debug.beginFile "TaskObservable" end
do
---@class TaskObservable : Observable
---@field _subscribe fun(observer: TaskObserver): any?
-- Observables push values to Observers.
TaskObservable = {}
TaskObservable.__index = TaskObservable
--- Creates a new TaskObservable.
---@param subscribe fun(observer: TaskObserver): any? The subscription function that produces values.
---@return TaskObservable
function TaskObservable.create(subscribe)
return setmetatable(Observable.create(subscribe), TaskObservable) --[[@as TaskObservable]]
end
--- Shorthand for creating an TaskObserver and passing it to this TaskObservable's subscription function.
---@generic T
---@param onNext table|fun(value: T, delay: number) Called when the TaskObservable produces a value.
---@param onError fun(message: string, delay: number)? Called when the TaskObservable terminates due to an error.
---@param onCompleted fun(delay: number)? Called when the TaskObservable completes normally.
function TaskObservable:subscribe(onNext, onError, onCompleted)
if type(onNext) == 'table' then
return self._subscribe(onNext)
else
return self._subscribe(TaskObserver.create(onNext, onError, onCompleted))
end
end
end
if Debug then Debug.endFile() end
Lua:
if Debug then Debug.beginFile "TaskObserver" end
do
---@class TaskObserver : Observer
---@field _onNext fun(value: unknown, delay: number)?
---@field _onError fun(message: string, delay: number)?
---@field _onCompleted fun(delay: number)?
TaskObserver = {}
TaskObserver.__index = TaskObserver
setmetatable(TaskObserver, Observable)
---@generic T
---@param onNext fun(value: T, delay: number)
---@param onError fun(message: string, delay: number)?
---@param onCompleted fun()?
---@return TaskObserver
function TaskObserver.create(onNext, onError, onCompleted)
return setmetatable(Observer.create(onNext, onError, onCompleted), TaskObserver) --[[@as TaskObserver]]
end
--- Pushes a value to the Observer.
---@generic T
---@param value T
---@param delay number
function TaskObserver:onNext(value, delay)
if not self.stopped then
self._onNext(value, delay)
end
end
--- Notify the Observer that an error has occurred.
---@param message string A string describing what went wrong.
---@param delay number
function TaskObserver:onError(message, delay)
if not self.stopped then
self.stopped = true
self._onError(message, delay)
end
end
--- Notify the Observer that the sequence has completed and will produce no more values.
---@param delay number
function TaskObserver:onCompleted(delay)
if not self.stopped then
self.stopped = true
self._onCompleted(delay)
end
end
end
if Debug then Debug.endFile() end
Lua:
if Debug then Debug.beginFile "TaskSubject" end
do
---@class TaskSubject : AsyncSubject
---@field observers TaskObserver[]
TaskSubject = {}
TaskSubject.__index = TaskSubject
setmetatable(TaskSubject, AsyncSubject)
---@generic T
---@param task Task<T>
---@return TaskSubject
function TaskSubject.create(task)
local TaskSubject = setmetatable({
taskRef = task,
observers = {}
}, TaskSubject)
return TaskSubject --[[@as TaskSubject]]
end
--- Creates a new Observer and attaches it to the TaskSubject.
---@generic T
---@param onNext fun(value: T, delay: number) - A function called when the TaskSubject produces a value or an existing Observer to attach to the TaskSubject.
---@param onError fun(message: string, delay: number) - Called when the TaskSubject terminates due to an error.
---@param onCompleted fun(delay: number) - Called when the TaskSubject completes normally.
---@return Subscription?
function TaskSubject:subscribe(onNext, onError, onCompleted)
local observer = TaskObserver.create(onNext, onError, onCompleted) --[[@as TaskObserver]]
if self.value then
observer:onNext(self.value, self.delay)
observer:onCompleted(self.delay)
return
elseif self.errorMessage then
observer:onError(self.errorMessage, self.delay)
return
end
table.insert(self.observers, observer)
return Subscription.create(function()
for i = 1, #self.observers do
if self.observers[i] == observer then
table.remove(self.observers, i)
return
end
end
end)
end
--- Pushes a value to the TaskSubject.
---@generic T
---@param value T
---@param delay number
function TaskSubject:onNext(value, delay)
if not self.stopped then
self.value = value
self.delay = delay
end
end
--- Signal to all Observers that an error has occurred.
---@param message string - A string describing what went wrong.
---@param delay number
function TaskSubject:onError(message, delay)
if not self.stopped then
self.errorMessage = message
for i = 1, #self.observers do
self.observers[i]:onError(self.errorMessage, delay)
end
self.stopped = true
end
end
--- Signal to all Observers that the TaskSubject will not produce any more values.
---@param delay number
function TaskSubject:onCompleted(delay)
if not self.stopped then
for i = 1, #self.observers do
if self.value then
self.observers[i]:onNext(self.value, self.delay)
end
self.observers[i]:onCompleted(delay)
end
self.stopped = true
end
end
end
if Debug then Debug.endFile() end
Lua:
if Debug then Debug.beginFile "TaskProcessor" end
-- Requires: TimerQueue & Stopwatch, Definitive Doubly-Linked List and RxLua
do
local WC3_OP_LIMIT = 1666666 -- JASS's max OP limit, used as reference to how many operations would lua be able to do without in-game lags. Very experimental and not yet confirmed.
-- How many operations can all processors in total use up in comparison to WC3_OP_LIMIT constant.
-- Leftover is used up by the game's internal machinations and other triggers not registered with any processor.
local GLOBAL_PROCESS_RATIO = 0.8 -- from 0.0 to 1.0
local PROCESSOR_PERIOD = 0.02
local DELAY_MULTIPLIER = 1/PROCESSOR_PERIOD
-- ========================================= --
-- Internal variables --
-- ========================================= --
local globalProcessOpLimit = WC3_OP_LIMIT*GLOBAL_PROCESS_RATIO*PROCESSOR_PERIOD
local processors = {} --- @type Processor[]
local function refreshProcessorsOpLimits()
if #processors == 0 then return end
local totalRatio = 0
for _, processor in ipairs(processors) do
totalRatio = totalRatio + processor.ratio;
end
for _, processor in ipairs(processors) do
processor.opLimit = globalProcessOpLimit*(processor.ratio/totalRatio)
end
end
---@class Task
---@field fn fun(delay: number)
---@field opCount integer
---@field promise TaskSubject
---@field requestTime number
Task = {}
Task.__index = Task
---@param fn fun(delay: number)
---@param opCount integer
---@param currentTime number
---@return Task
local function createTask(fn, opCount, currentTime)
return setmetatable({
fn = fn,
opCount = opCount,
requestTime = currentTime
}, Task)
end
---@class PeriodicTask : Task
---@field period number
---@field fn fun(delay: number): boolean done
PeriodicTask = setmetatable({}, Task)
PeriodicTask.__index = PeriodicTask
---@param task Task
---@param period number
---@return PeriodicTask
local function createPeriodicTask(task, period)
task.period = period
return setmetatable(task, PeriodicTask) --[[@as PeriodicTask]]
end
---@class TaskBucket
---@field tasks LinkedListHead
---@field opCount integer
---@param processor Processor
---@param task Task
local function enqueueToAvailableTaskBucket(processor, task)
if processor.taskExecutor.paused then
processor.taskExecutor:resume()
end
local chosenBucket ---@type TaskBucket
if processor.taskBuckets then
for _, bucket in ipairs(processor.taskBuckets) do
if bucket.opCount + task.opCount <= processor.opLimit then
chosenBucket = bucket
end
end
end
if chosenBucket == nil then
chosenBucket = LinkedList.create()
chosenBucket:insert(task)
table.insert(processor.taskBuckets, {tasks = chosenBucket, opCount = task.opCount} --[[@as TaskBucket]])
else
chosenBucket.tasks:insert(task)
chosenBucket.opCount = chosenBucket.opCount + task.opCount
end
end
---@class Processor
---@field ratio integer -- readOnly
---@field opLimit integer -- readOnly, how many operations can this processor do in a game-tick
---@field taskBuckets TaskBucket[] -- readOnly, don't touch!
---@field taskExecutor TimerQueue -- readOnly
---@field clock Stopwatch -- readOnly
---@field currentBucketIndex integer -- readOnly
Processor = {}
Processor.__index = Processor
---@param bucket TaskBucket
---@param task LinkedListNode
local function dequeueTask(bucket, task)
bucket.opCount = bucket.opCount - task.value--[[@as Task]].opCount
task:remove()
end
---@param processor Processor
---@param bucket TaskBucket
---@param currentTime number
local function processTasks(processor, bucket, currentTime)
if bucket.tasks.n > 0 then
for taskNode in bucket.tasks:loop() do
local task = taskNode.value ---@type Task
local delay = (currentTime - task.requestTime)*DELAY_MULTIPLIER
local status, result = pcall(task.fn, delay)
if status == true then
task.promise:onNext(result, delay)
else
task.promise:onError(result, delay)
task.promise:onCompleted(delay)
return
end
if task--[[@as PeriodicTask]].period and result == false then
task.requestTime = currentTime
else
dequeueTask(bucket, taskNode)
task.promise:onCompleted(delay)
end
end
end
end
---@param processor Processor
local function process(processor)
local bucketAmount = #processor.taskBuckets
if bucketAmount == 0 then
processor.taskExecutor:pause()
return
elseif processor.currentBucketIndex > bucketAmount then
processor.currentBucketIndex = 1
end
local bucket = processor.taskBuckets[processor.currentBucketIndex]
processTasks(processor, bucket, processor.clock:getElapsed())
if bucket.opCount == 0 then
table.remove(processor.taskBuckets, processor.currentBucketIndex)
if bucketAmount - 1 == 0 then
processor.taskExecutor:pause()
processor.currentBucketIndex = 1
end
else
processor.currentBucketIndex = processor.currentBucketIndex + 1
end
end
---@param fn fun(delay: number):boolean done
---@param fnOpCount integer
---@param period number?
---@return TaskObservable
function Processor:enqueueTask(fn, fnOpCount, period)
assert(type(fn) == "function", "Parameter 'fn' must be a function.")
assert(type(fnOpCount) == "number" and math.floor(fnOpCount) == fnOpCount, "Parameter 'fnOpCount' must be an integer.")
local task = createTask(fn, fnOpCount, self.clock:getElapsed())
if period ~= nil then
assert(type(period) == 'number' and period > 0, "Parameter 'period' must be a number and higher than 0.")
task = createPeriodicTask(task, period)
end
task.promise = TaskSubject.create(task)
enqueueToAvailableTaskBucket(self, task)
return task.promise --[[@as TaskObservable]]
end
---@param ratio integer integer higher or equal to 1, will re-adjust this new processor and other existing processors op limit in comparison to these numbers.
---@return Processor
function Processor.create(ratio)
assert((type(ratio) == "number" and math.floor(ratio) == ratio), "Paramater 'ratio' must be an integer")
assert(ratio >= 1, "Parameter 'ratio' must be higher or equal to 1")
local instance = setmetatable({
ratio = ratio,
-- opLimit is set by refreshProcessorsOpLimits
taskBuckets = {},
taskExecutor = TimerQueue.create(),
currentBucketIndex = 1,
clock = Stopwatch.create(true)
}, Processor)
table.insert(processors, instance)
refreshProcessorsOpLimits()
instance.taskExecutor:pause();
instance.taskExecutor:callPeriodically(PROCESSOR_PERIOD, nil, process, instance)
return instance
end
end
if Debug then Debug.endFile() end
Using information from this thread and the fact that the game in JASS VM can run up to 1666666 executions per second. I've set an arbitrary limit on how many operations all processor instances can do in a single game tick.
local WC3_OP_LIMIT = 1666666 -- JASS's max OP limit, used as reference to how many operations would Lua be able to do without in-game lags. Very experimental and not yet confirmed.
-- How many operations can all processors in total use up in comparison to WC3_OP_LIMIT constant.
-- Leftover is used up by the game's internal machinations and other triggers not registered with any processor.
local GLOBAL_PROCESS_RATIO = 0.8 -- from 0.0 to 1.0
local PROCESSOR_PERIOD = 0.02
...
local globalProcessOpLimit = WC3_OP_LIMIT*GLOBAL_PROCESS_RATIO*PROCESSOR_PERIOD
Note: delay is specified in game ticks (how many increments of 0.02 seconds have passed)
Lua:
myProcessor = Processor.create(1)
myProcessor:enqueueTask( -- enqueue a one-shot task to be executed as soon as possible.
---@param delay number
---@return any value to be passed to subscribed function
function(delay)
-- do stuff here
end, FUNCTION_OP_COUNT):subscribe(
---@param value any
---@param delay number
function(value, delay)
-- if executed once
end,
---@param value any
---@param delay number
function(value, delay)
-- if thrown an error
end,
---@param delay number
function(delay)
-- if task completed
end
)
myProcessor:enqueueTask( -- enqueue a periodic task
---@param delay number
---@return boolean periodic task exit condition, if true, task is done
function(delay)
-- do stuff here
end, FUNCTION_OP_COUNT, PERIOD):subscribe(
---@param value any
---@param delay number
function(value, delay)
-- if executed once
end,
---@param value any
---@param delay number
function(value, delay)
-- if thrown an error
end,
---@param delay number
function(delay)
-- if task completed
end
)
FUNCTION_OP_COUNT here is a placeholder and must be calculated by the developer and is used by the TaskProcessor to determine how many tasks it can fit in 1 execution bucket.
Each function is bound to have a different OP_COUNT, while you can set it as a constant to be used everywhere, it probably won't be optimal.
Here's an example of converting a simple code into a more complicated one.
Lua:
---@param rect rect
function damageUnitsInRect(rect)
local group = CreateGroup()
GroupEnumUnitsInRect(group, rect, nil)
ForGroup(group, function ()
local unit = GetEnumUnit()
local amount = someReallyComplicatedDamageCalculationOrSomething(unit)
UnitDamageTarget(unit, unit, amount, true, false, ATTACK_TYPE_CHAOS, DAMAGE_TYPE_DEATH, WEAPON_TYPE_WHOKNOWS)
end)
DestroyGroup(group)
end
---@param rect rect
function damageUnitsInRect(rect)
local group = CreateGroup()
GroupEnumUnitsInRect(group, rect, nil)
ForGroup(group, function ()
local unit = GetEnumUnit()
myProcessor:enqueueTask(function()
return someReallyComplicatedDamageCalculationOrSomething(unit)
end, OP_COUNT):subscribe(
function (damageAmount, delay)
UnitDamageTarget(unit, unit, damageAmount, true, false, ATTACK_TYPE_CHAOS, DAMAGE_TYPE_DEATH, WEAPON_TYPE_WHOKNOWS)
end)
end)
DestroyGroup(group)
end
This is still an unfinished resource as it's missing features like:
- Task priority
- Merging task buckets
- Actually handling periodic tasks, the PERIOD parameter is pretty much useless for now, as it repeats the task every 0.02 seconds anyways.
- Ability to actually use scheduling strategies instead of just FIFO approach
Also need a list of op-counts for natives.
Attached to this post is a heavily modified version of Chopinski's Missile System that uses this library as a proof of concept, since there's no apparent difference in-game between the original system and this one.
Attachments
Last edited: