I just pushed the code for my replication changes on launchpad:

$ bzr branch lp:~jan-kneschke/mysql-proxy/replication

The presentation should be available ... soon.

One of the first examples is about filtering binlogs. If you want to remove all ALTER statements from the replication stream (or in this case the binlog files) you can just iterate the binlog and copy everything to a new binlog, but the ALTER statement

local binlog = require("mysql.binlog")
local tokenizer = require("proxy.tokenizer")

local f_in = assert(binlog.open("/tmp/binlog-test.log"))
local f_out = assert(binlog.open("/tmp/binlog-test-filtered.log", "w"))
print("filtering /tmp/binlog-test.log to /tmp/binlog-test-filtered.log")

for event in f_in:next() do
    if event.type == "QUERY_EVENT" then
            local tokens = tokenizer.tokenize(event.query.query)
            local first_token = tokenizer.first_stmt_token(tokens)

            if first_token.token_name == "TK_SQL_ALTER" then
                    print("removing " .. event.query.query)
            else
                    -- log it
                    f_out:append(event)
            end
    else
            f_out:append(event)
    end
end

f_in:close()
f_out:close()

Straight forward.

You can also merge binlogs like it happens in sharded setups:

-- open the files 
local in_files = {
    assert(binlog.open("/tmp/binlog-merge-1.log")),
    assert(binlog.open("/tmp/binlog-merge-2.log"))
}
f_merged = assert(binlog.open("/tmp/binlog-merged.log", "w"))
print("merging to /tmp/binlog-merge.log")

local iterators = {}

-- get the iterators for all the binlogs
for i, f in ipairs(in_files) do
    iterators[i] = f:next()
end

local cur_events = {}
for i, f in ipairs(iterators) do
    cur_events[i] = f()
end

--
-- copy the oldest timestamp from all the binlogs to the out-binlog
repeat
    -- get the event with the oldest timestamp
    local o_ndx
    local o_ts
    for i = 1, #cur_events do
            local event = cur_events[i]
            if event and (not o_ts or event.timestamp < o_ts) then
                    o_ndx = i
                    o_ts = event.timestamp
            end
    end

    -- append the oldest event
    if o_ndx then
            f_merged:append(cur_events[o_ndx])
            cur_events[o_ndx] = iterators[o_ndx]()
    end
until not o_ndx


for i, f in ipairs(in_files) do
    f:close()
end

f_merged:close()

That's just the foundation for more fun stuff like: "synchronized replication". More of that in the session.


Comments

Enable javascript to load comments.