The idea is as old as Replication is:
How do you know which is the most current slave.
You can use the Master_Binlog_Pos
to guess what is most up to date, but which transaction does this binlog position match ?
With the proxy you can add a global transaction ID to your setup, if you let the inject some information into the stream.
The idea is simple and is documented in various places.
Create a MEMORY
table which is replicated with a single UNSIGNED BIGINT
and increment it at the end of each transaction.
CREATE TABLE trx (
trx_id BIGINT UNSIGNED NOT NULL
) ENGINE=memory;
INSERT INTO trx VALUES ( 0 );
When ever you commit a transaction UPDATE
the trx\_id
field:
UPDATE trx SET trx_id = trx_id + 1
Usecases:
- identify which slave is most current and switch to it in case of master failure
- identify if slaves are current enough to SELECT from them
- identify most current slave to send the load to them in R/W splitting
The wonders of Query Injection
As it might not be simple to find all the places in your app where you close a transaction you can use the proxy to analyze the traffic and inject a UPDATE
at the right places.
The MySQL protocol is nice to us and announces if we are in a transaction or not by setting a bit in the result-set of each query. As soon as the flag is 0 we know that a transaction is finished.
As configuration we have to know the name of the database, the table and the field to increment:
trx_db = "repl"
trx_table = "trx"
trx_table_fld = "trx_id"
Next we have to send all queries through the lua-script to tracks its result-set:
function read_query(packet)
-- send all queries to read_query_result()
proxy.queries:append(1, packet)
return proxy.PROXY_SEND_QUERY
end
And as last step we have to check the result-set of the queries if a transaction got closed:
function read_query_result(inj)
if inj.id ~= 1 then
-- ignore the result of all injected queries
return proxy.PROXY_IGNORE_RESULT
end
local res = assert(inj.resultset)
local flags = res.flags
-- let's hope the query had a resultset
--
if res.query_status == proxy.MYSQLD_PACKET_ERR then
return
end
if flags.in_trans == 0 then
-- we left the transaction, increment the trx-id
if trx_db ~= proxy.connection.default_db then
-- switch to the transaction db
proxy.queries:append(2,
string.char(proxy.COM_INIT_DB) .. trx_db)
end
proxy.queries:append(3, string.char(proxy.COM_QUERY) ..
"UPDATE " .. trx_table ..
" SET ".. trx_table_fld .. " = ".. trx_table_fld .." + 1" )
if proxy.connection.default_db and trx_db ~= proxy.connection.default_db then
-- and back to the DB of the user
proxy.queries:append(4,
string.char(proxy.COM_INIT_DB) .. proxy.connection.default_db)
end
end
end
The function does a bit more and switches between the user-db and the db of the trx-id table in case they are different.
root@127.0.0.1:4040> select * from repl.trx;
| trx_id |
+--------+
| 32 |
root@127.0.0.1:4040> select * from repl.trx;
| trx_id |
+--------+
| 33 |
We are in auto-commit mode and the trx table is not transactional. Each SELECT is a transaction and increments trx-counter.
As soon as we open a transaction explicitly the trx-counter isn't incremented.
root@127.0.0.1:4040> begin;
root@127.0.0.1:4040> select * from repl.trx;
| trx_id |
+--------+
| 34 |
root@127.0.0.1:4040> select * from repl.trx;
| trx_id |
+--------+
| 34 |
root@127.0.0.1:4040> commit;
TRX ID as part of the transaction
The above implementation shows the basic idea and injects the TRX_ID
after the end of the transaction. You can also invert the idea and make the TRX_ID
part of the COMMIT
by intercepting the ID before you send the COMMIT
.
In read_query()
you check for COMMIT
and prepend it with the UPDATE
. I leave the implementation to the imagination of the reader.
Misuse of the AFFECTED_ROWS()
I only follow the idea to point out a small trick: We can tweak the protocol a bit to transfer the TRX_ID
back without really sending a SELECT to read it from the table.
At the end of each non-SELECT query we have a OK packet which is made up of the following bytes:
Bytes Name
----- ----
1 (Length Coded Binary) field_count, always = 0
1-9 (Length Coded Binary) affected_rows
1-9 (Length Coded Binary) insert_id
2 server_status
2 warning_count
n (until end of packet) message
For COMMIT
this looks like
| result.packet = 00 00 00 02 00 00 00
We can overwrite the response of the COMMIT
packet and use affect_rows
or insert_id
for our needs and let the client access it.
Some configuration again, same as above:
trx_db = "repl"
trx_table = "trx"
trx_table_fld = "trx_id"
affected_rows = 0
The injection is a bit more interesting this time as we have to SELECT
the transaction id now. For performance reasons we use the session variable @trx_id
.
function read_query(packet)
if packet:byte() == proxy.COM_QUERY and packet:sub(2):lower() == "commit" then
-- we left the transaction, increment the trx-id
if trx_db ~= proxy.connection.default_db then
-- switch to the transaction db
proxy.queries:append(2,
string.char(proxy.COM_INIT_DB) .. trx_db)
end
proxy.queries:append(3, string.char(proxy.COM_QUERY) ..
"UPDATE " .. trx_table ..
" SET ".. trx_table_fld .. " = @trx_id := ( ".. trx_table_fld .." + 1 )" )
proxy.queries:append(4, string.char(proxy.COM_QUERY) ..
"SELECT @trx_id" )
if proxy.connection.default_db ~= "" and trx_db ~= proxy.connection.default_db then
-- and back to the DB of the user
proxy.queries:append(5,
string.char(proxy.COM_INIT_DB) .. proxy.connection.default_db)
end
end
proxy.queries:append(1, packet)
return proxy.PROXY_SEND_QUERY
end
Handling the result-set only has to extract the trx-id
and store it internally for a while. In case we sent a COMMIT
we patch the response a bit and replace the affected rows:
function read_query_result(inj)
if inj.id == 1 and inj.query:sub(2):lower() == "commit" then
proxy.response = {
type = proxy.MYSQLD_PACKET_RAW,
packets = {
"\000" .. -- fields
string.char(affected_rows) ..
"\000" .. -- insert_id
inj.resultset.raw:sub(4)
}
}
return proxy.PROXY_SEND_RESULT
elseif inj.id == 4 then
-- this should be the SELECT
for row in inj.resultset.rows do
affected_rows = row[1] + 0
end
return proxy.PROXY_IGNORE_RESULT
elseif inj.id ~= 1 then
return proxy.PROXY_IGNORE_RESULT
end
end
Just to show that it works:
root@127.0.0.1:4040 [(none)]> commit;
Query OK, 40 rows affected (0.01 sec)
In your APIs you can use mysql_affected_rows()
or similar calls to get access to this value.
Comments