• 🏆 Texturing Contest #33 is OPEN! Contestants must re-texture a SD unit model found in-game (Warcraft 3 Classic), recreating the unit into a peaceful NPC version. 🔗Click here to enter!
  • It's time for the first HD Modeling Contest of 2024. Join the theme discussion for Hive's HD Modeling Contest #6! Click here to post your idea!

[Lua] SyncStream

Level 8
Joined
Jan 23, 2015
Messages
121
With the discussion around syncing lots of data and its limitations of having lags/desyncs, I've decided to solve any lag or desync issues by spreading the sync process over time with timers. So I created this system. Without proofs, it is better than anything I've seen on the matter yet :grin:

I did not test it in multiplayer yet, but on same PC with two clients I've got it syncing data just fine, including syncing the entire Bee movie script. I'll polish the code and documentation later, it's fine already and I'm open to suggestions and criticism.

Lua:
if Debug then Debug.beginFile "SyncStream" end
--[[
    SyncStream v1

    Provides functionality to designed to safely sync arbitrary amounts of data.
    Uses timers to spread BlzSendSyncData calls over time.

    API:

        SyncStream[player_id]:sync(localData: string?, callback: fun(data: string))
            - Syncs the localData string then calls the callback with it
              NOTE: localData is used onlyQueued for the Local Player

        SyncStream[player_id]:sync(getLocalData: fun():string?, callback: fun(data: string))
            - An overload that uses a function to get the localData string
              NOTE: the function is used only for the Local Player, so beware of desyncs

    Requirements:
        Base64 by Trokkin                               @ https://www.hiveworkshop.com/threads/347907/

    Optional requirements:
        DebugUtils by Eikonium                          @ https://www.hiveworkshop.com/threads/330758/
        Total Initialization by Bribe                   @ https://www.hiveworkshop.com/threads/317099/

    Updated: 14 May 2023

--]]
OnInit("SyncStream", function()
    local SYNC_PREFIX = "F"
    local PACKAGE = {
        SIZE = 200,
        PER_TICK = 32,
        TICK_PER_SECOND = 32,
        --> data throughput is SIZE * PER_TICK * TICK_PER_SECOND bytes/second. 200*32*32 ~= 10kbps

        ID_BITS = 24,
        LENGTH_BITS = 24,
        N_BITS = 18,
    }

    --[ SYNC QUEUE CLASS ]--

    ---@class SyncQueue
    ---@field id integer
    ---@field callback fun(data: string)
    ---@field length integer
    ---@field chunks string[]
    ---@field next_chunk integer
    local SyncQueue = {}
    SyncQueue.__index = SyncQueue

    ---@param id integer The id of the promise
    ---@param data string Data to be sent from the local player
    function SyncQueue.create(id, data)
        local queue = setmetatable({
            id = id,
            chunks = {},
            next_chunk = 0,
            length = #data
        }, SyncQueue)
        for i = 1, #data, PACKAGE.SIZE do
            queue.chunks[#queue.chunks + 1] = data:sub(i, i + PACKAGE.SIZE - 1)
        end
        return queue
    end

    function SyncQueue:pop()
        if self.next_chunk > #self.chunks then
            return
        end

        local package
        local encoder = Base64.Encoder.create()
        encoder:writeBitString(self.id, PACKAGE.ID_BITS)
        encoder:writeBitString(self.next_chunk, PACKAGE.N_BITS)
        if self.next_chunk == 0 then
            encoder:writeBitString(self.length, PACKAGE.LENGTH_BITS)
            package = encoder:buildString()
        else
            package = encoder:buildString() .. self.chunks[self.next_chunk]
        end

        -- print(">", self.next_chunk, package)
        if BlzSendSyncData(SYNC_PREFIX, package) then
            self.next_chunk = self.next_chunk + 1
        end
    end

    --[ PROMISE CLASS ]--

    local syncTimer = CreateTimer()

    ---@class Promise
    ---@field id integer
    ---@field callback fun(data: string)
    ---@field length integer?
    ---@field next_chunk integer
    ---@field chunks string[]
    ---@field queue SyncQueue?
    local Promise = {}
    Promise.__index = Promise

    ---@param id integer The id of the promise
    ---@param callback fun(s: string) The function to be called with the received data
    function Promise.create(id, callback)
        return setmetatable({
            id = id,
            callback = callback,
            chunks = {},
            next_chunk = 0,
            length = nil,
            queue = nil,
        }, Promise)
    end

    function Promise:consume(chunk_id, package)
        if self.length and self.length <= (self.next_chunk - 1) * PACKAGE.SIZE then
            return
        end

        -- print("<", chunk_id, package)
        self.chunks[chunk_id] = package
        while self.next_chunk <= chunk_id and self.chunks[self.next_chunk] ~= nil do
            self.next_chunk = self.next_chunk + 1
        end
        if self.length and self.length <= (self.next_chunk - 1) * PACKAGE.SIZE then
            self.callback(table.concat(self.chunks))
        end
    end

    --[ SYNC STREAM CLASS ]--

    local syncTrigger ---@type trigger
    local localPlayer = GetLocalPlayer()
    local localStream = GetLocalPlayer()
    local streams = {} ---@type SyncStream[]

    --- Sends or receives player's data assymentrically
    ---@class SyncStream
    ---@field owner player
    ---@field is_local boolean
    ---@field next_promise integer
    ---@field promises Promise[]
    local SyncStream = {}
    SyncStream.__index = SyncStream

    ---@param owner player The player owning the data from the stream
    local function CreateSyncStream(owner)
        return setmetatable({
            owner = owner,
            is_local = owner == localPlayer,
            next_promise = 1,
            promises = {}
        }, SyncStream)
    end

    ---
    ---@param getLocalData string | fun():string
    ---@param onDataSynced fun(data: string)
    function SyncStream:sync(getLocalData, onDataSynced)
        local promise = Promise.create(#self.promises + 1, onDataSynced)

        if self.is_local then
            if type(getLocalData) == "function" then
                getLocalData = getLocalData()
            end
            if type(getLocalData) ~= "string" then
                getLocalData = "sync error: bad data type provided " .. type(getLocalData)
            end
            promise.queue = SyncQueue.create(promise.id, getLocalData)
        end

        self.promises[promise.id] = promise
    end

    for i = 0, bj_MAX_PLAYER_SLOTS - 1 do
        streams[i] = CreateSyncStream(Player(i))
    end
    SyncStreams = streams

    OnInit.trig(function()
        --- Setup sender timer
        local s = streams[GetPlayerId(GetLocalPlayer())]
        if not s.is_local then
            print("SyncStream panic: local stream is not local")
            return
        end
        TimerStart(syncTimer, 1 / PACKAGE.TICK_PER_SECOND, true, function()
            for i = 1, PACKAGE.PER_TICK do
                while s.next_promise <= #s.promises and s.promises[s.next_promise].queue == nil do
                    s.next_promise = s.next_promise + 1
                end
                if s.promises[s.next_promise] == nil then
                    return
                end
                local q = s.promises[s.next_promise].queue
                if q == nil then
                    return
                end
                q:pop()
                if q.next_chunk > #q.chunks then
                    s.promises[s.next_promise].queue = nil
                end
            end
        end)

        --- Setup receiver trigger
        syncTrigger = CreateTrigger()
        for i = 0, bj_MAX_PLAYER_SLOTS - 1 do
            BlzTriggerRegisterPlayerSyncEvent(syncTrigger, Player(i), SYNC_PREFIX, false)
        end
        TriggerAddAction(syncTrigger, function()
            local prefix = BlzGetTriggerSyncPrefix()
            if prefix ~= SYNC_PREFIX then
                print("SyncStream panic: invalid sync prefix, expected '" .. SYNC_PREFIX .. "' but got '" .. prefix .. "'")
            end
            local owner = GetTriggerPlayer()
            local package = BlzGetTriggerSyncData()
            local stream = streams[GetPlayerId(owner)]
            if stream == nil then
                print("SyncStream panic: no stream found for player '" .. SYNC_PREFIX .. "' but got '" .. prefix .. "'")
                return
            end

            local decoder = Base64.Decoder.create(package)
            local id = decoder:readBitString(PACKAGE.ID_BITS)
            local promise = stream.promises[id]
            if not promise then
                print("SyncStream panic: no promise found for id", id)
                return
            end
            local chunk_id = decoder:readBitString(PACKAGE.N_BITS)
            if chunk_id == 0 then
                local data_length = decoder:readBitString(PACKAGE.LENGTH_BITS)
                promise.length = data_length
                promise.next_chunk = 1
                return
            end
            promise:consume(chunk_id, package:sub(decoder.pointer + 1))
        end)
    end)
end)
if Debug then Debug.endFile() end

Lua:
function testStream_BeeMovie()
    -- Synchronous context
    local data = nil
    if GetLocalPlayer() == Player(0) then
        -- assuming bee movie script was already saved by Player(0) into the file using my FileIO lib:
        -- https://www.hiveworkshop.com/threads/fileio-lua-optimized.347049/
        -- https://github.com/ScrewTheTrees/SaveLoadBigData/blob/master/src/SaveLoad/EntireBeeMovieScript.ts
        data = FileIO.Load("beeMovie.pld")
        if data == nil then
            print("No Bee Movie script found!")
            return ""
        end
    end
    -- All clients must execute this function but only Player(0)'s 'data' will be used.
    SyncStreams[0]:sync(data, function(my_str) -- use received data as synchronized some time after
        local timer = CreateTimer()
        local data = {}
        for line in string.gmatch(my_str, "[^\n]+") do
            data[#data+1] = line
        end
        local i = 1
        TimerStart(timer, 1/32, true, function()
            print("!", i, data[i]:sub(1, 16))
            i = i + 1
            if i > #data then
                DestroyTimer(timer)
            end
        end)
    end)
end

function test_complex(dataset_length)
    local dataset = {}
    local dataset_display = {}
    for i = 1, dataset_length do
        local len = math.random(31)
        local value = math.random(1 << len) - 1
        dataset[#dataset + 1] = {
            len = len,
            value = value
        }
        dataset_display[#dataset_display + 1] = ("%d:0x%x"):format(len, value)
    end

    print(table.concat(dataset_display, ', '))

    for i, data in ipairs(dataset) do
        local e = Base64.Encoder.create()
        e:writeBitString(i, 24)
        e:writeBitString(data.value, data.len)
        local s = e:buildString()
        SyncStreams[0]:sync(s, function(recv)
            local decoder = Base64.Decoder.create(recv)
            local j = decoder:readBitString(24)
            local set = dataset[j]
            local value = decoder:readBitString(set.len)
            local result = j == i and value == set.value
            if result then
                print (("success %d, 0x%x"):format(j, value))
            else
                print (("failure %d vs %d, 0x%x vs 0x%x"):format(i, j, value, set.value))
            end
        end)
        print (("enqueued %d, 0x%x"):format(i, data.value))
    end
end

Requirements:

Optional requirements:
 
Last edited:
Top