- Joined
- Jan 23, 2015
- Messages
- 124
With the discussion around syncing lots of data and its limitations of having lags/desyncs, I've decided to solve any lag or desync issues by spreading the sync process over time with timers. So I created this system. Without proofs, it is better than anything I've seen on the matter yet 
I did not test it in multiplayer yet, but on same PC with two clients I've got it syncing data just fine, including syncing the entire Bee movie script. I'll polish the code and documentation later, it's fine already and I'm open to suggestions and criticism.
Requirements:
Optional requirements:

I did not test it in multiplayer yet, but on same PC with two clients I've got it syncing data just fine, including syncing the entire Bee movie script. I'll polish the code and documentation later, it's fine already and I'm open to suggestions and criticism.
Lua:
if Debug then Debug.beginFile "SyncStream" end
--[[
SyncStream v1
Provides functionality to designed to safely sync arbitrary amounts of data.
Uses timers to spread BlzSendSyncData calls over time.
API:
SyncStream[player_id]:sync(localData: string?, callback: fun(data: string))
- Syncs the localData string then calls the callback with it
NOTE: localData is used onlyQueued for the Local Player
SyncStream[player_id]:sync(getLocalData: fun():string?, callback: fun(data: string))
- An overload that uses a function to get the localData string
NOTE: the function is used only for the Local Player, so beware of desyncs
Requirements:
Base64 by Trokkin @ https://www.hiveworkshop.com/threads/347907/
Optional requirements:
DebugUtils by Eikonium @ https://www.hiveworkshop.com/threads/330758/
Total Initialization by Bribe @ https://www.hiveworkshop.com/threads/317099/
Updated: 14 May 2023
--]]
OnInit("SyncStream", function()
local SYNC_PREFIX = "F"
local PACKAGE = {
SIZE = 200,
PER_TICK = 32,
TICK_PER_SECOND = 32,
--> data throughput is SIZE * PER_TICK * TICK_PER_SECOND bytes/second. 200*32*32 ~= 10kbps
ID_BITS = 24,
LENGTH_BITS = 24,
N_BITS = 18,
}
--[ SYNC QUEUE CLASS ]--
---@class SyncQueue
---@field id integer
---@field callback fun(data: string)
---@field length integer
---@field chunks string[]
---@field next_chunk integer
local SyncQueue = {}
SyncQueue.__index = SyncQueue
---@param id integer The id of the promise
---@param data string Data to be sent from the local player
function SyncQueue.create(id, data)
local queue = setmetatable({
id = id,
chunks = {},
next_chunk = 0,
length = #data
}, SyncQueue)
for i = 1, #data, PACKAGE.SIZE do
queue.chunks[#queue.chunks + 1] = data:sub(i, i + PACKAGE.SIZE - 1)
end
return queue
end
function SyncQueue:pop()
if self.next_chunk > #self.chunks then
return
end
local package
local encoder = Base64.Encoder.create()
encoder:writeBitString(self.id, PACKAGE.ID_BITS)
encoder:writeBitString(self.next_chunk, PACKAGE.N_BITS)
if self.next_chunk == 0 then
encoder:writeBitString(self.length, PACKAGE.LENGTH_BITS)
package = encoder:buildString()
else
package = encoder:buildString() .. self.chunks[self.next_chunk]
end
-- print(">", self.next_chunk, package)
if BlzSendSyncData(SYNC_PREFIX, package) then
self.next_chunk = self.next_chunk + 1
end
end
--[ PROMISE CLASS ]--
local syncTimer = CreateTimer()
---@class Promise
---@field id integer
---@field callback fun(data: string)
---@field length integer?
---@field next_chunk integer
---@field chunks string[]
---@field queue SyncQueue?
local Promise = {}
Promise.__index = Promise
---@param id integer The id of the promise
---@param callback fun(s: string) The function to be called with the received data
function Promise.create(id, callback)
return setmetatable({
id = id,
callback = callback,
chunks = {},
next_chunk = 0,
length = nil,
queue = nil,
}, Promise)
end
function Promise:consume(chunk_id, package)
if self.length and self.length <= (self.next_chunk - 1) * PACKAGE.SIZE then
return
end
-- print("<", chunk_id, package)
self.chunks[chunk_id] = package
while self.next_chunk <= chunk_id and self.chunks[self.next_chunk] ~= nil do
self.next_chunk = self.next_chunk + 1
end
if self.length and self.length <= (self.next_chunk - 1) * PACKAGE.SIZE then
self.callback(table.concat(self.chunks))
end
end
--[ SYNC STREAM CLASS ]--
local syncTrigger ---@type trigger
local localPlayer = GetLocalPlayer()
local localStream = GetLocalPlayer()
local streams = {} ---@type SyncStream[]
--- Sends or receives player's data assymentrically
---@class SyncStream
---@field owner player
---@field is_local boolean
---@field next_promise integer
---@field promises Promise[]
local SyncStream = {}
SyncStream.__index = SyncStream
---@param owner player The player owning the data from the stream
local function CreateSyncStream(owner)
return setmetatable({
owner = owner,
is_local = owner == localPlayer,
next_promise = 1,
promises = {}
}, SyncStream)
end
---
---@param getLocalData string | fun():string
---@param onDataSynced fun(data: string)
function SyncStream:sync(getLocalData, onDataSynced)
local promise = Promise.create(#self.promises + 1, onDataSynced)
if self.is_local then
if type(getLocalData) == "function" then
getLocalData = getLocalData()
end
if type(getLocalData) ~= "string" then
getLocalData = "sync error: bad data type provided " .. type(getLocalData)
end
promise.queue = SyncQueue.create(promise.id, getLocalData)
end
self.promises[promise.id] = promise
end
for i = 0, bj_MAX_PLAYER_SLOTS - 1 do
streams[i] = CreateSyncStream(Player(i))
end
SyncStreams = streams
OnInit.trig(function()
--- Setup sender timer
local s = streams[GetPlayerId(GetLocalPlayer())]
if not s.is_local then
print("SyncStream panic: local stream is not local")
return
end
TimerStart(syncTimer, 1 / PACKAGE.TICK_PER_SECOND, true, function()
for i = 1, PACKAGE.PER_TICK do
while s.next_promise <= #s.promises and s.promises[s.next_promise].queue == nil do
s.next_promise = s.next_promise + 1
end
if s.promises[s.next_promise] == nil then
return
end
local q = s.promises[s.next_promise].queue
if q == nil then
return
end
q:pop()
if q.next_chunk > #q.chunks then
s.promises[s.next_promise].queue = nil
end
end
end)
--- Setup receiver trigger
syncTrigger = CreateTrigger()
for i = 0, bj_MAX_PLAYER_SLOTS - 1 do
BlzTriggerRegisterPlayerSyncEvent(syncTrigger, Player(i), SYNC_PREFIX, false)
end
TriggerAddAction(syncTrigger, function()
local prefix = BlzGetTriggerSyncPrefix()
if prefix ~= SYNC_PREFIX then
print("SyncStream panic: invalid sync prefix, expected '" .. SYNC_PREFIX .. "' but got '" .. prefix .. "'")
end
local owner = GetTriggerPlayer()
local package = BlzGetTriggerSyncData()
local stream = streams[GetPlayerId(owner)]
if stream == nil then
print("SyncStream panic: no stream found for player '" .. SYNC_PREFIX .. "' but got '" .. prefix .. "'")
return
end
local decoder = Base64.Decoder.create(package)
local id = decoder:readBitString(PACKAGE.ID_BITS)
local promise = stream.promises[id]
if not promise then
print("SyncStream panic: no promise found for id", id)
return
end
local chunk_id = decoder:readBitString(PACKAGE.N_BITS)
if chunk_id == 0 then
local data_length = decoder:readBitString(PACKAGE.LENGTH_BITS)
promise.length = data_length
promise.next_chunk = 1
return
end
promise:consume(chunk_id, package:sub(decoder.pointer + 1))
end)
end)
end)
if Debug then Debug.endFile() end
Lua:
function testStream_BeeMovie()
-- Synchronous context
local data = nil
if GetLocalPlayer() == Player(0) then
-- assuming bee movie script was already saved by Player(0) into the file using my FileIO lib:
-- https://www.hiveworkshop.com/threads/fileio-lua-optimized.347049/
-- https://github.com/ScrewTheTrees/SaveLoadBigData/blob/master/src/SaveLoad/EntireBeeMovieScript.ts
data = FileIO.Load("beeMovie.pld")
if data == nil then
print("No Bee Movie script found!")
return ""
end
end
-- All clients must execute this function but only Player(0)'s 'data' will be used.
SyncStreams[0]:sync(data, function(my_str) -- use received data as synchronized some time after
local timer = CreateTimer()
local data = {}
for line in string.gmatch(my_str, "[^\n]+") do
data[#data+1] = line
end
local i = 1
TimerStart(timer, 1/32, true, function()
print("!", i, data[i]:sub(1, 16))
i = i + 1
if i > #data then
DestroyTimer(timer)
end
end)
end)
end
function test_complex(dataset_length)
local dataset = {}
local dataset_display = {}
for i = 1, dataset_length do
local len = math.random(31)
local value = math.random(1 << len) - 1
dataset[#dataset + 1] = {
len = len,
value = value
}
dataset_display[#dataset_display + 1] = ("%d:0x%x"):format(len, value)
end
print(table.concat(dataset_display, ', '))
for i, data in ipairs(dataset) do
local e = Base64.Encoder.create()
e:writeBitString(i, 24)
e:writeBitString(data.value, data.len)
local s = e:buildString()
SyncStreams[0]:sync(s, function(recv)
local decoder = Base64.Decoder.create(recv)
local j = decoder:readBitString(24)
local set = dataset[j]
local value = decoder:readBitString(set.len)
local result = j == i and value == set.value
if result then
print (("success %d, 0x%x"):format(j, value))
else
print (("failure %d vs %d, 0x%x vs 0x%x"):format(i, j, value, set.value))
end
end)
print (("enqueued %d, 0x%x"):format(i, data.value))
end
end
Requirements:
Optional requirements:
- [Lua] - Debug Utils (Ingame Console etc.) by @Eikonium
- [Lua] - Total Initialization by @Bribe (remove/replace OnInit if you don't use it)
Last edited: