12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
ADADADADAD
mysql数据库 时间:2024-12-03 12:14:48
作者:文/会员上传
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
12-09
MySQL:MGR 学习(2):Write set(写集合)的写入过程水平有限,有误请谅解。源码版本5.7.22一、前文总结前文 <<MySQL:MGR 学习(1):写集合(Write set)>>中已经说明了Write set的生
以下为本文的正文内容,内容仅供参考!本站为公益性网站,复制本文以及下载DOC文档全部免费。
水平有限,有误请谅解。
源码版本5.7.22
前文 <<MySQL:MGR 学习(1):写集合(Write set)>>中已经说明了Write set的生成过程,但是Write set是需要封装如下Transaction_context_log_event中进行广播到其他节点进行认证的。本文就描述Write set的写入和广播的过程。
如前文所描述,整个事物的Write set在函数binlog_log_row中生成,对于5.7来讲每一行的每个唯一键都会生成一个Write set(但是咨询宋利兵老师得知8.0唯一键不会再记录Write set了),每个Write set实际上是一个8字节的uint64类型其通过hash函数生成,并且在Rpl_transaction_write_set_ctx存储了一个vector数组和一个set集合来分别存储,如果修改的行比较多那么可能需要一个更多内存来存储这些hash值,虽然8字节比较小,但是如果是大事物上千万的表在一个事物里面做修改那么内存可能消耗会上百兆。如下图是事物执行期间(commit之前)最终形成的Write set内存空间示意图。
image.png
二、Transaction_context_log_event生成的时机在事物执行期间会生成map event/query event/dml event等并且会源源不断的写入到binlog cache中,同时会将Write set 不断的写入到Rpl_transaction_write_set_ctx保存在内存中,这些逻辑都在binlog_log_row中。但是Transaction_context_log_event的生成却是在commit的时候,具体的位置是在MYSQL_BIN_LOG::prepare之后但是在MYSQL_BIN_LOG::ordered_commit之前,显而易见这个时候的binlog event还在bing cache中,还没有写入binlog file中。所以MGR的事物全局认证的动作是发生在binlog event落地之前。下面是这个栈帧:
#0group_replication_trans_before_commit(param=0x7ffff0e7b8d0)at/root/softm/percona-server-5.7.22-22/rapid/plugin/group_replication/src/observer_trans.cc:511#10x00000000014e4814inTrans_delegate::before_commit(this=0x2e44800,thd=0x7fffd8000df0,all=false,trx_cache_log=0x7fffd8907a10,stmt_cache_log=0x7fffd8907858,cache_log_max_size=18446744073709547520)at/root/softm/percona-server-5.7.22-22/sql/rpl_handler.cc:325#20x000000000188a386inMYSQL_BIN_LOG::commit(this=0x2e7b440,thd=0x7fffd8000df0,all=false)at/root/softm/percona-server-5.7.22-22/sql/binlog.cc:8974#30x0000000000f80623inha_commit_trans(thd=0x7fffd8000df0,all=false,ignore_global_read_lock=false)at/root/softm/percona-server-5.7.22-22/sql/handler.cc:1830#40x00000000016ddab9intrans_commit_stmt(thd=0x7fffd8000df0)at/root/softm/percona-server-5.7.22-22/sql/transaction.cc:458#50x00000000015d1a8dinmysql_execute_command(thd=0x7fffd8000df0,first_level=true)at/root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:5293#60x00000000015d3182inmysql_parse(thd=0x7fffd8000df0,parser_state=0x7ffff0e7e600)at/root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:5901#70x00000000015c6d16indispatch_command(thd=0x7fffd8000df0,com_data=0x7ffff0e7ed70,command=COM_QUERY)at/root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:1490#80x00000000015c5aa3indo_command(thd=0x7fffd8000df0)at/root/softm/percona-server-5.7.22-22/sql/sql_parse.cc:1021#90x000000000170ebb0inhandle_connection(arg=0x3cd32d0)at/root/softm/percona-server-5.7.22-22/sql/conn_handler/connection_handler_per_thread.cc:312#100x0000000001946140inpfs_spawn_thread(arg=0x3c71630)at/root/softm/percona-server-5.7.22-22/storage/perfschema/pfs.cc:2190#110x00007ffff7bc7851instart_thread()from/lib64/libpthread.so.0#120x00007ffff651290dinclone()from/lib64/libc.so.6三、MGR全局认证发送内容的生成过程
下面是我通过对源码浅显的理解得出过程:
1、获取当前的binlog cache内容记录为cache_log,这些就是已经在执行阶段生成map/query/dml event等。
2、生成一个新的IO_CACHE作为临时存储为cache,目的在于存储。Transaction_context_log_event 和Gtid_log_event。
3、将cache_log类型转换为READ类型同时初始化各种辅助类容如偏移量。
4、初始化Transaction_context_log_event 。
5、扫描Rpl_transaction_write_set_ctx中的write_set_unique 集合的内容,并且将其存储到Transaction_write_set 定义的内存空间中write_set中,注意这里只是用到了集合没用到数组。这里也就是进行Write set的一个拷贝而已其考到write_set临时变量中。
6、将write_set内容填充到Transaction_context_log_event中,整个过程还会做base64的转换,最终填充到event的是base64格式的Write set类容。完成后析构write_set来临时变量
7、 将Transaction_context_log_event写入到第二步定义的cache中。
8、生成Gtid_log_event,只是做一些初始化动作,Gtid并没有生成。
9、将Gtid_log_event写入到第二步定义的cache中。
10、通过cache+cache_log的总和来对比group_replication_transaction_size_limit设置的值,也就是判断整个事物的binlog event是否操作了参数设置。
11、将cache类型转换为READ类型同时初始化各种辅助类容如偏移量。
12、将cache和cache_log分别写入到到transaction_msg中。
13、流控相关,没仔细看,如果有机会学习流控机制在仔细学习。
14、gcs_module负责发送transaction_msg到各个节点
15、挂起等待事物认证的结果。
那么整个过程大概就是:
经过hash的Write set (集合)->拷贝到write_set变量(类数组)->通过base64算法写入到Transaction_context_log_event ->合并其他binlog event到transaction_msg->gcs_module广播transaction_msg到其他节点->等待认证结果
四、相关源码if(trx_cache_log_position>0&&stmt_cache_log_position==0)//如果存在事物cache{cache_log=param->trx_cache_log;//设置到IO_cachecache_log_position=trx_cache_log_position;}elseif(trx_cache_log_position==0&&stmt_cache_log_position>0)//如果存在语句cache{cache_log=param->stmt_cache_log;cache_log_position=stmt_cache_log_position;is_dml=false;may_have_sbr_stmts=true;}else{/*purecov:begininspected*/log_message(MY_ERROR_LEVEL,"Wecanonlyuseonecachetypeata""timeonsession%u",param->thread_id);shared_plugin_stop_lock->release_read_lock();DBUG_RETURN(1);/*purecov:end*/}applier_module->get_pipeline_stats_member_collector()->increment_transactions_local();DBUG_ASSERT(cache_log->type==WRITE_CACHE);DBUG_PRINT("cache_log",("thread_id:%u,trx_cache_log_position:%llu,""stmt_cache_log_position:%llu",param->thread_id,trx_cache_log_position,stmt_cache_log_position));/*Opengroupreplicationcache.Reusethesamecacheoneachsessionforimprovedperformance.*/cache=observer_trans_get_io_cache(param->thread_id,param->cache_log_max_size);//获取一个新的IO_CACHE系统if(cache==NULL)//错误处理{/*purecov:begininspected*/error=pre_wait_error;gotoerr;/*purecov:end*/}//Reinitbinlogcachetoread.if(reinit_cache(cache_log,READ_CACHE,0))////将IO_CACHE类型进行转换并且位置还原{/*purecov:begininspected*/log_message(MY_ERROR_LEVEL,"Failedtoreinitbinlogcachelogforread""onsession%u",param->thread_id);error=pre_wait_error;gotoerr;/*purecov:end*/}/*Afterthis,cache_logshouldbereinittooldsavedvaluewhenwearegoingoutofthefunctionscope.*/reinit_cache_log_required=true;//Createtransactioncontext.tcle=newTransaction_context_log_event(param->server_uuid,Rpl_transaction_write_set_ctxis_dml,param->thread_id,is_gtid_specified);//初始化Transaction_context_log_eventif(!tcle->is_valid()){/*purecov:begininspected*/log_message(MY_ERROR_LEVEL,"Failedtocreatethecontextofthecurrent""transactiononsession%u",param->thread_id);error=pre_wait_error;gotoerr;/*purecov:end*/}if(is_dml){Transaction_write_set*write_set=get_transaction_write_set(param->thread_id);//获取前期得到writeset并且放回到一个临时内存空间write_set中/*WhenGTIDisspecifiedwemayhaveemptytransactions,thatis,atransactionmayhavenotwritesetatallbecauseitdidn'tchangeanydata,itwilljustpersistthatGTIDasapplied.*/if((write_set==NULL)&&(!is_gtid_specified)){log_message(MY_ERROR_LEVEL,"Failedtoextractthesetofitemswritten""duringtheexecutionofthecurrent""transactiononsession%u",param->thread_id);error=pre_wait_error;gotoerr;}if(write_set!=NULL){if(add_write_set(tcle,write_set))//将整个wirte_set内容复制到eventTransaction_context_log_event中此时就进入了event了{/*purecov:begininspected*/cleanup_transaction_write_set(write_set);//writeset已经完成了它的功能需要析构log_message(MY_ERROR_LEVEL,"Failedtogatherthesetofitemswritten""duringtheexecutionofthecurrent""transactiononsession%u",param->thread_id);error=pre_wait_error;gotoerr;/*purecov:end*/}cleanup_transaction_write_set(write_set);//如果add_write_set函数调用出现有问题也需要析构掉DBUG_ASSERT(is_gtid_specified||(tcle->get_write_set()->size()>0));}else{/*ForemptytransactionsweshouldsettheGTIDmay_have_sbr_stmts.Seecommentatbinlog_cache_data::may_have_sbr_stmts().*/may_have_sbr_stmts=true;}Log_event::write}//Writetransactioncontexttogroupreplicationcache.tcle->write(cache);//写入到MGRCACHE写入TCLE的header(virtual)body(virtual)footer//WriteGtidlogeventtogroupreplicationcache.gle=newGtid_log_event(param->server_id,is_dml,0,1,may_have_sbr_stmts,gtid_specification);gle->write(cache);//写入GTIDevent到MGRCACHE占位transaction_size=cache_log_position+my_b_tell(cache);if(is_dml&&transaction_size_limit&&transaction_size>transaction_size_limit){log_message(MY_ERROR_LEVEL,"Erroronsession%u.""Transactionofsize%lluexceedsspecifiedlimit%lu.""Toincreasethelimitpleaseadjustgroup_replication_transaction_size_limitoption.",param->thread_id,transaction_size,transaction_size_limit);//group_replication_transaction_size_limit事物大小参数error=pre_wait_error;gotoerr;}//Reinitgroupreplicationcachetoread.if(reinit_cache(cache,READ_CACHE,0))//将IO_CACHE类型进行转换并且位置还原{/*purecov:begininspected*/log_message(MY_ERROR_LEVEL,"Errorwhilere-initializinganinternal""cache,forreadoperations,onsession%u",param->thread_id);error=pre_wait_error;gotoerr;/*purecov:end*/}//Copygroupreplicationcachetobuffer.if(transaction_msg.append_cache(cache))//加入到transaction_msg{/*purecov:begininspected*/log_message(MY_ERROR_LEVEL,"Errorwhileappendingdatatoaninternal""cacheonsession%u",param->thread_id);error=pre_wait_error;gotoerr;/*purecov:end*/}//Copybinlogcachecontenttobuffer.if(transaction_msg.append_cache(cache_log))//加入到transaction_msg{/*purecov:begininspected*/log_message(MY_ERROR_LEVEL,"Errorwhilewritingbinarylogcacheon""session%u",param->thread_id);error=pre_wait_error;gotoerr;/*purecov:end*/}DBUG_ASSERT(certification_latch!=NULL);if(certification_latch->registerTicket(param->thread_id)){/*purecov:begininspected*/log_message(MY_ERROR_LEVEL,"Unabletoregisterforgettingnotifications""regardingtheoutcomeofthetransactionon""session%u",param->thread_id);error=pre_wait_error;gotoerr;/*purecov:end*/}#ifndefDBUG_OFFDBUG_EXECUTE_IF("test_basic_CRUD_operations_sql_service_interface",{DBUG_SET("-d,test_basic_CRUD_operations_sql_service_interface");DBUG_ASSERT(!sql_command_check());};);DBUG_EXECUTE_IF("group_replication_before_message_broadcast",{constcharact[]="nowwait_forwaiting";DBUG_ASSERT(!debug_sync_set_action(current_thd,STRING_WITH_LEN(act)));});#endif/*Checkifmemberneedstothrottleitstransactionstoavoidcausestarvationonthegroup.*/applier_module->get_flow_control_module()->do_wait();//流控相关//BroadcasttheTransactionMessagesend_error=gcs_module->send_message(transaction_msg);//gcs广播if(send_error==GCS_MESSAGE_TOO_BIG){/*purecov:begininspected*/log_message(MY_ERROR_LEVEL,"Errorbroadcastingtransactiontothegroup""onsession%u.Messageistoobig.",param->thread_id);error=pre_wait_error;gotoerr;/*purecov:end*/}elseif(send_error==GCS_NOK){/*purecov:begininspected*/log_message(MY_ERROR_LEVEL,"Errorwhilebroadcastingthetransactionto""thegrouponsession%u",param->thread_id);error=pre_wait_error;gotoerr;/*purecov:end*/}shared_plugin_stop_lock->release_read_lock();DBUG_ASSERT(certification_latch!=NULL);if(certification_latch->waitTicket(param->thread_id))//等待认证结果{/*purecov:begininspected*/log_message(MY_ERROR_LEVEL,"Errorwhilewaitingforconflictdetection""proceduretofinishonsession%u",param->thread_id);error=post_wait_error;gotoerr;/*purecov:end*/}
intadd_write_set(Transaction_context_log_event*tcle,Transaction_write_set*set){DBUG_ENTER("add_write_set");intiterator=set->write_set_size;//将循环次数设置为set的长度也就是有多少个writesetsfor(inti=0;i<iterator;i++){ucharbuff[BUFFER_READ_PKE];int8store(buff,set->write_set[i]);//逐字节复制到buffer中uint64consttmp_str_sz=base64_needed_encoded_length((uint64)BUFFER_READ_PKE);char*write_set_value=(char*)my_malloc(PSI_NOT_INSTRUMENTED,static_cast<size_t>(tmp_str_sz),MYF(MY_WME));//13bytes(gdb)ptmp_str_sz$2=13if(!write_set_value)//分配内存错误{/*purecov:begininspected*/log_message(MY_ERROR_LEVEL,"Nomemorytogeneratewriteidentificationhash");DBUG_RETURN(1);/*purecov:end*/}if(base64_encode(buff,(size_t)BUFFER_READ_PKE,write_set_value))//做base64算法{/*purecov:begininspected*/log_message(MY_ERROR_LEVEL,"Base64encodingofthewriteidentificationhashfailed");DBUG_RETURN(1);/*purecov:end*/}tcle->add_write_set(write_set_value);//最终将base64格式的writeset写入到event中}DBUG_RETURN(0);}
Transaction_write_set*get_transaction_write_set(unsignedlongm_thread_id){DBUG_ENTER("get_transaction_write_set");THD*thd=NULL;Transaction_write_set*result_set=NULL;Find_thd_with_idfind_thd_with_id(m_thread_id,false);thd=Global_THD_manager::get_instance()->find_thd(&find_thd_with_id);if(thd){std::set<uint64>*write_set=thd->get_transaction()->get_transaction_write_set_ctx()->get_write_set();//Rpl_transaction_write_set_ctxstd::set<uint64>*get_write_set();unsignedlongwrite_set_size=write_set->size();//返回集合大小if(write_set_size==0){mysql_mutex_unlock(&thd->LOCK_thd_data);DBUG_RETURN(NULL);}result_set=(Transaction_write_set*)my_malloc(key_memory_write_set_extraction,sizeof(Transaction_write_set),MYF(0));//这里为其Transaction_write_set分配内存空间result_set->write_set_size=write_set_size;//获取sizeresult_set->write_set=(unsignedlonglong*)my_malloc(key_memory_write_set_extraction,write_set_size*sizeof(unsignedlonglong),MYF(0));//分配内存intresult_set_index=0;for(std::set<uint64>::iteratorit=write_set->begin();//完成复制注意是从set中复制到简单的内存中it!=write_set->end();++it){uint64temp=*it;result_set->write_set[result_set_index++]=temp;}mysql_mutex_unlock(&thd->LOCK_thd_data);}DBUG_RETURN(result_set);}
作者微信:
11-20
11-19
11-20
11-20
11-20
11-19
11-20
11-20
11-19
11-20
11-19
11-19
11-19
11-19
11-19
11-19