I just pushed the code for my replication changes on launchpad:
$ bzr branch lp:~jan-kneschke/mysql-proxy/replication
The presentation should be available ... soon.
One of the first examples is about filtering binlogs. If you want to remove all ALTER
statements from the replication stream (or in this case the binlog files) you can just iterate the binlog and copy everything to a new binlog, but the ALTER
statement
local binlog = require("mysql.binlog")
local tokenizer = require("proxy.tokenizer")
local f_in = assert(binlog.open("/tmp/binlog-test.log"))
local f_out = assert(binlog.open("/tmp/binlog-test-filtered.log", "w"))
print("filtering /tmp/binlog-test.log to /tmp/binlog-test-filtered.log")
for event in f_in:next() do
if event.type == "QUERY_EVENT" then
local tokens = tokenizer.tokenize(event.query.query)
local first_token = tokenizer.first_stmt_token(tokens)
if first_token.token_name == "TK_SQL_ALTER" then
print("removing " .. event.query.query)
else
-- log it
f_out:append(event)
end
else
f_out:append(event)
end
end
f_in:close()
f_out:close()
Straight forward.
You can also merge binlogs like it happens in sharded setups:
-- open the files
local in_files = {
assert(binlog.open("/tmp/binlog-merge-1.log")),
assert(binlog.open("/tmp/binlog-merge-2.log"))
}
f_merged = assert(binlog.open("/tmp/binlog-merged.log", "w"))
print("merging to /tmp/binlog-merge.log")
local iterators = {}
-- get the iterators for all the binlogs
for i, f in ipairs(in_files) do
iterators[i] = f:next()
end
local cur_events = {}
for i, f in ipairs(iterators) do
cur_events[i] = f()
end
--
-- copy the oldest timestamp from all the binlogs to the out-binlog
repeat
-- get the event with the oldest timestamp
local o_ndx
local o_ts
for i = 1, #cur_events do
local event = cur_events[i]
if event and (not o_ts or event.timestamp < o_ts) then
o_ndx = i
o_ts = event.timestamp
end
end
-- append the oldest event
if o_ndx then
f_merged:append(cur_events[o_ndx])
cur_events[o_ndx] = iterators[o_ndx]()
end
until not o_ndx
for i, f in ipairs(in_files) do
f:close()
end
f_merged:close()
That's just the foundation for more fun stuff like: "synchronized replication". More of that in the session.
Comments