您正在查看: Surou 发布的文章

同步区块--p2p通信--notice_message

上篇笔记写到远程节点给本地节点发送notice消息,通知本地节点同步区块。本篇笔记继续学习本地节点如何从远程节点同步区块,这次重点写notice消息的发送与处理过程。

1、notice_message消息结构

//消息结构 定义部分
struct notice_message {
  notice_message () : known_trx(), known_blocks() {}
  ordered_txn_ids known_trx;
  ordered_blk_ids known_blocks;
};
//选择查看定义,跳转到
using ordered_txn_ids = select_ids<transaction_id_type>;
using ordered_blk_ids = select_ids<block_id_type>;
//选择查看定义,跳转到
template<typename T>
struct select_ids {
  select_ids () : mode(none),pending(0),ids() {}
  id_list_modes  mode;
  uint32_t       pending;
  vector<T>      ids;        //模板类
  bool           empty () const { return (mode == none || ids.empty()); }
};

notice_message消息类型包括id_list_modes类型的变量,该类型为一个枚举类型,包括enum id_list_modes { none, catch_up, last_irr_catch_up, normal };,pending表示区块数目。

2、发送notice_message消息

远程节点给本地节点发送notice_message,通知本地节点同步到不可逆区块。

//远程节点给本地节点发送的通知消息内容 消息调用部分
if (lib_num > msg.head_num ) {
   fc_dlog(logger, "sync check state 2");
   if (msg.generation > 1 || c->protocol_version > proto_base) {
      notice_message note;
      note.known_trx.pending = lib_num;
      note.known_trx.mode = last_irr_catch_up;   //类型为不可逆区块类型
      note.known_blocks.mode = last_irr_catch_up;
      note.known_blocks.pending = head;
      c->enqueue( note );
   }
   c->syncing = true;
   return;
}

远程节点发送的notice_message消息内容比较简单,包括不可逆区块数、最新区块数、同步方式(同步到不可逆区块或者同步到最新区块),然后放入到消息队列等待发送出去。

3、接收notice_message消息

同上篇笔记所写的一样,处理notice_message消息同样在一个handle_message的重载函数里进行,参数为当前通信的远程节点的connect对象指针和消息。

void net_plugin_impl::handle_message( connection_ptr c, const notice_message &msg) {
      // peer tells us about one or more blocks or txns. When done syncing, forward on
      // notices of previously unknown blocks or txns,
      //
      peer_ilog(c, "received notice_message");
      c->connecting = false;
      request_message req;
      bool send_req = false;
      if (msg.known_trx.mode != none) {
         fc_dlog(logger,"this is a ${m} notice with ${n} blocks", ("m",modes_str(msg.known_trx.mode))("n",msg.known_trx.pending));
      }
      switch (msg.known_trx.mode) {
      case none:
         break;
      case last_irr_catch_up: {
         c->last_handshake_recv.head_num = msg.known_trx.pending;
         req.req_trx.mode = none;
         break;
      }
      case catch_up : {
         if( msg.known_trx.pending > 0) {
            // plan to get all except what we already know about.
            req.req_trx.mode = catch_up;
            send_req = true;
            size_t known_sum = local_txns.size();
            if( known_sum ) {
               for( const auto& t : local_txns.get<by_id>( ) ) {
                  req.req_trx.ids.push_back( t.id );
               }
            }
         }
         break;
      }
      case normal: {
         dispatcher->recv_notice (c, msg, false);
      }
      }

      if (msg.known_blocks.mode != none) {
         fc_dlog(logger,"this is a ${m} notice with ${n} blocks", ("m",modes_str(msg.known_blocks.mode))("n",msg.known_blocks.pending));
      }
      switch (msg.known_blocks.mode) {
      case none : {
         if (msg.known_trx.mode != normal) {
            return;
         }
         break;
      }
      case last_irr_catch_up: //同步到最新区块和同步到不可逆区块 调用的函数相同
      case catch_up: {
         sync_master->recv_notice(c,msg); //通过sync_master进行区块同步
         break;
      }
      case normal : {
         dispatcher->recv_notice (c, msg, false);
         break;
      }
      default: {
         peer_elog(c, "bad notice_message : invalid known_blocks.mode ${m}",("m",static_cast<uint32_t>(msg.known_blocks.mode)));
      }
      }
      fc_dlog(logger, "send req = ${sr}", ("sr",send_req));
      if( send_req) {
         c->enqueue(req);
      }
   }

由于远程节点发送的notice_message数据包内容为同步到不可逆区块,mode为last_irr_catch_up,所以代码中调用sync_master->recv_notice(c,msg)函数来进行区块同步。此函数同步区块分为两种情况

   void sync_manager::recv_notice (connection_ptr c, const notice_message &msg) {
      fc_ilog (logger, "sync_manager got ${m} block notice",("m",modes_str(msg.known_blocks.mode)));
      if (msg.known_blocks.mode == catch_up) {
         if (msg.known_blocks.ids.size() == 0) {
            elog ("got a catch up with ids size = 0");
         }
         else {
             //同步到最新区块
            verify_catchup(c,  msg.known_blocks.pending, msg.known_blocks.ids.back());
         }
      }
      else {
         c->last_handshake_recv.last_irreversible_block_num = msg.known_trx.pending;
         reset_lib_num (c);
         //同步到不可逆区块
         start_sync(c, msg.known_blocks.pending);
      }
   }

我们重点看同步到不可逆区块,函数为start_sync,参数为connection对象指针和消息中包含的最新区块数(不过我觉得应该是不可逆区块数)。

   void sync_manager::start_sync( connection_ptr c, uint32_t target) {
      if( target > sync_known_lib_num) {
         sync_known_lib_num = target;
      }

      if (!sync_required()) {
         uint32_t bnum = chain_plug->chain().last_irreversible_block_num();
         uint32_t hnum = chain_plug->chain().fork_db_head_block_num();
         fc_dlog( logger, "We are already caught up, my irr = ${b}, head = ${h}, target = ${t}",
                  ("b",bnum)("h",hnum)("t",target));
         return;
      }

      if (state == in_sync) {
         set_state(lib_catchup);
         sync_next_expected_num = chain_plug->chain().last_irreversible_block_num() + 1;
      }

      fc_ilog(logger, "Catching up with chain, our last req is ${cc}, theirs is ${t} peer ${p}",
              ( "cc",sync_last_requested_num)("t",target)("p",c->peer_name()));

      request_next_chunk(c);
   }

在这里,成员变量sync_known_lib_num赋值为sync_known_lib_num和target的最大值。该成员变量表示已知的当前不可逆区块数,可见上一步同步到不可逆区块的调用应该为start_sync(c, msg.known_trx.pending);,在这个函数里,主要是检查区块同步信息,然后调用request_next_chunk(c)函数进行区块同步。

   void sync_manager::request_next_chunk( connection_ptr conn ) {
      uint32_t head_block = chain_plug->chain().fork_db_head_block_num();
      ······ //省略代码
      if( sync_last_requested_num != sync_known_lib_num ) {
         uint32_t start = sync_next_expected_num;
         uint32_t end = start + sync_req_span - 1; // sync_req_span为配置文件中设置,每次同步多少个区块,默认是100个。
         if( end > sync_known_lib_num )
            end = sync_known_lib_num;
         if( end > 0 && end >= start ) {
            fc_ilog(logger, "requesting range ${s} to ${e}, from ${n}",
                    ("n",source->peer_name())("s",start)("e",end));
            source->request_sync_blocks(start, end);//同样是发送某种消息来同步区块
            sync_last_requested_num = end;
         }
      }
   }

查看request_next_chunk函数的功能,我们发现内部调用request_sync_blocks函数来发送同步消息,具体实现需要进入分析。

void connection::request_sync_blocks (uint32_t start, uint32_t end) {
      sync_request_message srm = {start,end};
      enqueue( net_message(srm));
      sync_wait();
   }

request_sync_blocks函数内部实现很简单,构造了一个sync_request_message类型的消息,然后放入到消息队列中,消息的内容为起始区块和结束区块,每次请求100个(config.ini中默认设置,可以修改)。
此时这篇笔记讨论的消息类型先到这里,下一篇笔记讨论sync_request_message类型的消息,分析本地节点是如何从远程同步区块的。

转载自:https://github.com/RootkitKiller/EosLearn/blob/master/Eos%E4%BB%A3%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E4%BA%94%EF%BC%89%E5%90%8C%E6%AD%A5%E5%8C%BA%E5%9D%97--p2p%E9%80%9A%E4%BF%A1--notice_message.md

同步区块--p2p通信--handshake_message

上一篇整体上学习了net_plugin插件,但是具体的内容没有写。从这篇笔记开始,重点分析各节点之间是如何通过p2p插件同步区块的。首先,启动一个节点之后,当前节点向远程节点发送的第一个数据包就是handshake_message类型的(time时间戳类型除外),所以这篇笔记中分析handshake_message类型数据包的发送过程和接收过程。
运行环境:CLion编译器,并配置连接到主网节点。

1、handshake_message消息结构

struct handshake_message {
      uint16_t                   network_version = 0; ///< incremental value above a computed base
      chain_id_type              chain_id; ///< used to identify chain
      fc::sha256                 node_id; ///< used to identify peers and prevent self-connect
      chain::public_key_type     key; ///< authentication key; may be a producer or peer key, or empty
      tstamp                     time;
      fc::sha256                 token; ///< digest of time to prove we own the private key of the key above
      chain::signature_type      sig; ///< signature for the digest
      string                     p2p_address;
      uint32_t                   last_irreversible_block_num = 0;
      block_id_type              last_irreversible_block_id;
      uint32_t                   head_num = 0;
      block_id_type              head_id;
      string                     os;
      string                     agent;
      int16_t                    generation;
   };

握手包内容包括:网络版本、chain_id、node_id、p2p_address、节点名称等配置信息,以及链的状态(当前节点的不可逆区块数、不可逆区块id、最新区块id、最新区块数)。其中区块数是指区块编号(1、2、3......),区块id是指32位的hash值。

2、发送handshake_message消息

启动本地节点之后,会连接到配置文件中的p2p节点,并获取当前链信息、配置信息,然后构建handshake_message包并发送到其他p2p节点。

   void connection::send_handshake( ) {
      handshake_initializer::populate(last_handshake_sent); //填充消息内容
      last_handshake_sent.generation = ++sent_handshake_count;
      fc_dlog(logger, "Sending handshake generation ${g} to ${ep}",
              ("g",last_handshake_sent.generation)("ep", peer_name()));
      enqueue(last_handshake_sent);  //构建完成握手包之后,放入队列中
   }

发送的数据包内容如下:

//大小 348个字节  连接之后发送的报文
0040         5c 01 00 00 00 b6 04 ac a3 76 f2 06 b8 fc   2Ñ\....¶.¬£vò.¸ü
0050   25 a6 ed 44 db dc 66 54 7c 36 c6 c3 3e 3a 11 9f   %¦íDÛÜfT|6ÆÃ>:..
0060   fb ea ef 94 36 42 f0 e9 06 da b2 ea 2d 82 ce 71   ûêï.6Bðé.Ú²ê-.Îq
0070   45 c4 74 df 2f 3f 5f d9 df 11 af 8c 70 62 06 7a   EÄtß/?_Ùß.¯.pb.z
0080   5d de 3e 6b 87 10 22 18 0c 00 00 00 00 00 00 00   ]Þ>k..".........
0090   00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00   ................
00a0   00 00 00 00 00 00 00 00 00 00 00 aa 8b b8 81 00   ...........ª.¸..
00b0   77 05 00 00 00 00 00 00 00 00 00 00 00 00 00 00   w...............
00c0   00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00   ................
00d0   00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00   ................
00e0   00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00   ................
00f0   00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00   ................
0100   00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00   ................
0110   00 00 00 00 00 2d 6d 6f 6f 6e 69 6e 77 61 74 65   .....-mooninwate
0120   72 64 65 4d 61 63 42 6f 6f 6b 2d 50 72 6f 2e 6c   rdeMacBook-Pro.l
0130   6f 63 61 6c 3a 39 38 37 36 20 2d 20 64 61 62 32   ocal:9876 - dab2
0140   65 61 32 00 00 00 00 00 00 00 00 00 00 00 00 00   ea2.............
0150   00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00   ................
0160   00 00 00 00 00 00 00 01 00 00 00 00 00 00 01 40   ...............@
0170   51 47 47 7a b2 f5 f5 1c da 42 7b 63 81 91 c6 6d   QGGz²õõ.ÚB{c..Æm
0180   2c 59 aa 39 2d 5c 2c 98 07 6c b0 03 6f 73 78 10   ,Yª9-\,..l°.osx.
0190   22 45 4f 53 20 54 65 73 74 20 41 67 65 6e 74 22   "EOS Test Agent"
01a0   01 00  

3、接收handshake_message消息

eos接收到其他节点发送过来的消息后,根据消息类型的不同,重载了不同的handle_message函数,其中handshake_message类型,主要功能包括两方面:
(1)、验证接收到的handshake包内容中的链id、节点id、网络版本等信息。
(2)、对比接收到的消息中链的状态和自身链的状态对比,然后进行区块同步。

//重载后的函数, 处理handshake_message消息
void net_plugin_impl::handle_message( connection_ptr c, const handshake_message &msg) {
      peer_ilog(c, "received handshake_message");
      if (!is_valid(msg)) {
         peer_elog( c, "bad handshake message");
         c->enqueue( go_away_message( fatal_other ));
         return;
      }
      controller& cc = chain_plug->chain();
      uint32_t lib_num = cc.last_irreversible_block_num( );
      uint32_t peer_lib = msg.last_irreversible_block_num;
      if (msg.generation == 1) {
         if( c->peer_addr.empty() || c->last_handshake_recv.node_id == fc::sha256()) {
            fc_dlog(logger, "checking for duplicate" );
            //遍历所有连接的节点,c代表当前连接中的节点 保证同一个p2p节点只存在一个连接
            ········
           }
         }
         if( msg.chain_id != chain_id) {
            elog( "Peer on a different chain. Closing connection");
            c->enqueue( go_away_message(go_away_reason::wrong_chain) );
            return;
         }
          ······
         if(  c->node_id != msg.node_id) {
            c->node_id = msg.node_id;
         }
        ········
      c->last_handshake_recv = msg;
      c->_logger_variant.reset();
      sync_master->recv_handshake(c,msg); //根据链的状态进行同步管理
   }

主要功能在于recv_handshake函数中同步区块管理。

void sync_manager::recv_handshake (connection_ptr c, const handshake_message &msg) {
      controller& cc = chain_plug->chain();
      uint32_t lib_num = cc.last_irreversible_block_num( );
      uint32_t peer_lib = msg.last_irreversible_block_num;
      reset_lib_num(c);
      c->syncing = false;

      //--------------------------------
      // sync need checks; (lib == last irreversible block)
      //
      // 0. my head block id == peer head id means we are all caugnt up block wise
      // 1. my head block num < peer lib - start sync locally
      // 2. my lib > peer head num - send an last_irr_catch_up notice if not the first generation
      //
      // 3  my head block num <= peer head block num - update sync state and send a catchup request
      // 4  my head block num > peer block num ssend a notice catchup if this is not the first generation
      //
      //-----------------------------

      uint32_t head = cc.fork_db_head_block_num( );
      block_id_type head_id = cc.fork_db_head_block_id();
      if (head_id == msg.head_id) {
         fc_dlog(logger, "sync check state 0");
         // notify peer of our pending transactions
         notice_message note;
         note.known_blocks.mode = none;
         note.known_trx.mode = catch_up;
         note.known_trx.pending = my_impl->local_txns.size();
         c->enqueue( note );
         return;
      }
      if (head < peer_lib) {
         fc_dlog(logger, "sync check state 1");
         // wait for receipt of a notice message before initiating sync
         if (c->protocol_version < proto_explicit_sync) {
            start_sync( c, peer_lib);
         }
         return;
      }
      if (lib_num > msg.head_num ) {
         fc_dlog(logger, "sync check state 2");
         if (msg.generation > 1 || c->protocol_version > proto_base) {
            notice_message note;
            note.known_trx.pending = lib_num;
            note.known_trx.mode = last_irr_catch_up;
            note.known_blocks.mode = last_irr_catch_up;
            note.known_blocks.pending = head;
            c->enqueue( note );
         }
         c->syncing = true;
         return;
      }

      if (head <= msg.head_num ) {
         fc_dlog(logger, "sync check state 3");
         verify_catchup (c, msg.head_num, msg.head_id);
         return;
      }
      else {
         fc_dlog(logger, "sync check state 4");
         if (msg.generation > 1 ||  c->protocol_version > proto_base) {
            notice_message note;
            note.known_trx.mode = none;
            note.known_blocks.mode = catch_up;
            note.known_blocks.pending = head;
            note.known_blocks.ids.push_back(head_id);
            c->enqueue( note );
         }
         c->syncing = true;
         return;
      }
      elog ("sync check failed to resolve status");
   }

这里的对比区块信息,主要分为五种情况:
(1)、接收到的最新区块id与自身最新区块id相同。
(2)、自身最新区块数小于接收到的区块的不可逆数。(自己的最新区块高度比远程节点的不可逆区块高度还要低,需要同步。)
(3)、自身不可逆区块数大于接收到的最新区块数。(与(2)相反,通知远程节点,需要同步。发送的是notice_message类型的消息,下一篇笔记中再写。)
(4)、自身最新区块数小于接收到的最新区块数。(需要同步,发送的是request_message消息,后面再写这种消息类型。)
(5)、自身最新区块数大于接收到的最新区块数。(与(4)相反,告通知远程节点,需要同步。发送的是notice_message类型的消息,下一篇笔记中在写。)
以上便是handshake_message消息的发送与接收过程。此篇笔记写到本地节点将自己的配置信息、链的状态告诉远程节点,远程节点接收到这些信息之后,发现自身链比本地节点的链更长,所以给本地节点发送消息同步区块。(消息类型为notice_message,下一篇分析本地节点接收到notice_message类型的消息之后,如何同步区块)。

转载自:https://github.com/RootkitKiller/EosLearn/blob/master/Eos%E4%BB%A3%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E5%9B%9B%EF%BC%89%E5%90%8C%E6%AD%A5%E5%8C%BA%E5%9D%97--p2p%E9%80%9A%E4%BF%A1--handshake_message.md

同步区块--p2p通信--sync_request_message与signed_block

这篇笔记写同步区块过程中,发送的最后两种消息类型--sync_request_message与signed_block。上篇笔记写到远程节点将链的信息(不可逆区块数、最新区块数、同步方式等)作为notice_message方式发送给本地节点,本地节点接收到notice_message消息,向远程节点发送sync_request_message消息同步区块。

1、同步区块过程图示

本地节点从远程节点同步不可逆区块

2、sync_request_message消息接收处理

继续上篇笔记中的内容,远程节点接收到本地节点发送过来的sync_request_message消息之后,处理函数:

   void net_plugin_impl::handle_message( connection_ptr c, const sync_request_message &msg) {
      if( msg.end_block == 0) {   //结束区块数是否为0
         c->peer_requested.reset();
         c->flush_queues();
      } else {
         c->peer_requested = sync_state( msg.start_block,msg.end_block,msg.start_block-1);
         c->enqueue_sync_block();
      }
   }

本地节点发送的同步消息,默认为1-100的区块数,则处理函数进入enqueue_sync_block

   bool connection::enqueue_sync_block() {
      controller& cc = app().find_plugin<chain_plugin>()->chain();
      if (!peer_requested)
         return false;
      uint32_t num = ++peer_requested->last;
      bool trigger_send = num == peer_requested->start_block;
      if(peer_requested->last == peer_requested->end_block) {
         peer_requested.reset();
      }
      try {
         signed_block_ptr sb = cc.fetch_block_by_number(num);
         if(sb) {
            enqueue( *sb, trigger_send);
            return true;
         }
      } catch ( ... ) {
         wlog( "write loop exception" );
      }
      return false;
   }

num变量为peer_requested成员的last字段的值,表示当前需要同步的区块数。每次调用enqueue_sync_block函数,则该字段加一,即发送下一个区块给本地节点。然后调用fetch_block_by_number函数获取自己的第num个区块,并构造signed_block消息,然后放入到消息队列,这样就把本地节点请求的一个区块发送给了本地节点。

  boost::asio::async_write(*socket, bufs, [c](boost::system::error_code ec, std::size_t w) {
    try {
      ········//省略代码
        while (conn->out_queue.size() > 0) {
            conn->out_queue.pop_front();
        }
        conn->enqueue_sync_block();    ///同步下一个区块
        conn->do_queue_write();
      }
      ······ //省略代码
   }

在net_plugin插件处理消息队列的时候,会在异步发送消息的回调函数里,发送下一个区块给本地节点。

3、接收signed_block消息

本地节点接收远程节点发送过来的signed_block消息,消息内容为区块详细数据

   void net_plugin_impl::handle_message( connection_ptr c, const signed_block &msg) {
      controller &cc = chain_plug->chain();
      block_id_type blk_id = msg.id();
      uint32_t blk_num = msg.block_num();
      fc_dlog(logger, "canceling wait on ${p}", ("p",c->peer_name()));
      c->cancel_wait();

      try {
         //查看本地是否存在该id的区块
         if( cc.fetch_block_by_id(blk_id)) {
            sync_master->recv_block(c, blk_id, blk_num);
            return;
         }
      } catch( ...) {
         // should this even be caught?
         elog("Caught an unknown exception trying to recall blockID");
      }

      dispatcher->recv_block(c, blk_id, blk_num);
      fc::microseconds age( fc::time_point::now() - msg.timestamp);
      peer_ilog(c, "received signed_block : #${n} block age in secs = ${age}",
              ("n",blk_num)("age",age.to_seconds()));

      go_away_reason reason = fatal_other;
      try {
        //写入区块数据,accept_block函数用到了boost库里面的信号插槽signal2库,这里没有分析清楚。
         signed_block_ptr sbp = std::make_shared<signed_block>(msg);
         chain_plug->accept_block(sbp); //, sync_master->is_active(c));
         reason = no_reason;
      } catch( const unlinkable_block_exception &ex) {
         peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
         reason = unlinkable;
      } catch( const block_validate_exception &ex) {
         peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
         elog( "block_validate_exception accept block #${n} syncing from ${p}",("n",blk_num)("p",c->peer_name()));
         reason = validation;
      } catch( const assert_exception &ex) {
         peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
         elog( "unable to accept block on assert exception ${n} from ${p}",("n",ex.to_string())("p",c->peer_name()));
      } catch( const fc::exception &ex) {
         peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
         elog( "accept_block threw a non-assert exception ${x} from ${p}",( "x",ex.to_string())("p",c->peer_name()));
         reason = no_reason;
      } catch( ...) {
         peer_elog(c, "bad signed_block : unknown exception");
         elog( "handle sync block caught something else from ${p}",("num",blk_num)("p",c->peer_name()));
      }

      update_block_num ubn(blk_num);
      if( reason == no_reason ) {
         for (const auto &recpt : msg.transactions) {
            auto id = (recpt.trx.which() == 0) ? recpt.trx.get<transaction_id_type>() : recpt.trx.get<packed_transaction>().id();
            auto ltx = local_txns.get<by_id>().find(id);
            if( ltx != local_txns.end()) {
               local_txns.modify( ltx, ubn );
            }
            auto ctx = c->trx_state.get<by_id>().find(id);
            if( ctx != c->trx_state.end()) {
               c->trx_state.modify( ctx, ubn );
            }
         }
         sync_master->recv_block(c, blk_id, blk_num);
      }
      else {
         sync_master->rejected_block(c, blk_num);
      }
   }

signed_block 消息内容
signed_block 区块报文内容

0040             b9 00 00 00 07 dc f9 5c 45 00 00 00 00 00   .F¹....Üù\E.....
0050   ea 30 55 00 00 00 00 00 01 40 51 47 47 7a b2 f5   ê0U......@QGGz²õ
0060   f5 1c da 42 7b 63 81 91 c6 6d 2c 59 aa 39 2d 5c   õ.ÚB{c..Æm,Yª9-\
0070   2c 98 07 6c b0 00 00 00 00 00 00 00 00 00 00 00   ,..l°...........
0080   00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00   ................
0090   00 00 00 00 00 e0 24 4d b4 c0 2d 68 ae 64 de c1   .....à$M´À-h®dÞÁ
00a0   60 31 0e 24 7b b0 4e 5c b5 99 af b7 c1 47 10 fb   `1.${°N\µ.¯·ÁG.û
00b0   f3 f4 57 6c 0e 00 00 00 00 00 00 00 20 60 15 f0   óôWl........ `.ð
00c0   39 e2 fd d0 df b2 31 6f ea 28 67 90 c6 b1 55 4f   9âýÐß²1oê(g.ƱUO
00d0   28 5a 54 e7 d2 5d b3 ea 91 ef 2e 8c 11 0a 57 e2   (ZTçÒ]³ê.ï....Wâ
00e0   8e fb ff e0 92 c0 e0 2d 92 f2 88 5e 72 48 43 b4   .ûÿà.Àà-.ò.^rHC´
00f0   a5 5f 29 0e 20 9f 87 1f 41 bb 39 3c 84 00 00      ¥_). ...A»9<...:

转载自:https://github.com/RootkitKiller/EosLearn/blob/master/Eos%E4%BB%A3%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E5%85%AD%EF%BC%89%E5%90%8C%E6%AD%A5%E5%8C%BA%E5%9D%97--p2p%E9%80%9A%E4%BF%A1--sync_request_message%E4%B8%8Esigned_block.md

分析nodeos的流程

本篇笔记主要分析nodeos程序的流程

1、代码所在路径(yourpath换成你的路径)
yourpath/eos/programs/nodeos/main.cpp
2、首先main函数中的代码并不长,可以通过五个类来学习这段代码。

      app().set_version(eosio::nodeos::config::version);
      app().register_plugin<history_plugin>();

      auto root = fc::app_path();
      app().set_default_data_dir(root / "eosio/nodeos/data" );
      app().set_default_config_dir(root / "eosio/nodeos/config" );
      http_plugin::set_defaults({
         .address_config_prefix = "",
         .default_unix_socket_path = "",
         .default_http_port = 8888
      });
      if(!app().initialize<chain_plugin, http_plugin, net_plugin, producer_plugin>(argc, argv))
         return INITIALIZE_FAIL;
      initialize_logging();
      ilog("nodeos version ${ver}", ("ver", app().version_string()));
      ilog("eosio root is ${root}", ("root", root.string()));
      app().startup();
      app().exec();

上面的代码,均是通过调用app()的成员函数来实现的。我们很明显的看到main函数的生命周期和app()返回的对象是完全相同的。所以,第一个分析的就是app()返回的类对象。

1、application类(单例模式)

查看app()函数,发现其返回的是一个application类对象的一个引用,对象是通过application类的静态方法instance创建的。instance方法创建一个application类对象,并返回其引用。由此实现一个单例模式,每次调用app()仅创建一个对象。
application& app() { return application::instance(); }
application& application::instance() { static application _app; return _app; }
main函数中出现的set_versionset_default_data_dirset_default_config_dir均为简单的成员函数,这里略过不写。
主要分析一下其他几个函数:register_plugininitializestartupexec
register_plugin函数是一个模板函数,功能是对插件进行注册。注册即创建一个插件对象,并保存在application对象的plugins变量里面。注册插件的时候是需要注册其依赖插件的。比如p2p插件依赖于chain插件,那么在注册p2p插件时,也需要注册chain插件。其中获取请求插件,采用了一个宏实现了递归获取的方式,后续在plugins类中详细说明。

 template<typename Plugin>
    auto& register_plugin() {
    auto existing = find_plugin<Plugin>();   
    if(existing)
         return *existing;
    auto plug = new Plugin();
    plugins[plug->name()].reset(plug);
    plug->register_dependencies(); // 比较重要的地方,调用的是plugins类中的函数,后续分析。
    return *plug;
 }

initialize函数是一个变参模板函数,功能是对插件进行初始化(调用每个插件的initialize方法)。内部将变参参数初始化为一个vector向量,并调用initialize_impl函数来具体实现。

  //application.hpp
  template<typename... Plugin>
     bool                 initialize(int argc, char** argv) {
     return initialize_impl(argc, argv, {find_plugin<Plugin>()...});
  }
 //application.cpp  调用插件的initialize方法。
  for (auto plugin : autostart_plugins)
       if (plugin != nullptr && plugin->get_state() == abstract_plugin::registered)
           plugin->initialize(options);

startup函数很简单,功能用来启动初始化过的插件,内部调用每个插件的startup函数来启动。
for (auto plugin : initialized_plugins) plugin->startup();
exec函数使用了boost::asio::io_service io服务,在每个启动的插件线程里使用异步io的地方,均使用的是application对象的io服务,exec函数创建了异步IO异常终止的信号,并对其做了资源释放处理。

void application::exec() {
   std::shared_ptr<boost::asio::signal_set> sigint_set(new boost::asio::signal_set(*io_serv, SIGINT));
   sigint_set->async_wait([sigint_set,this](const boost::system::error_code& err, int num) {
     quit();
     sigint_set->cancel();
   });

   std::shared_ptr<boost::asio::signal_set> sigterm_set(new boost::asio::signal_set(*io_serv, SIGTERM));
   sigterm_set->async_wait([sigterm_set,this](const boost::system::error_code& err, int num) {
     quit();
     sigterm_set->cancel();
   });

   std::shared_ptr<boost::asio::signal_set> sigpipe_set(new boost::asio::signal_set(*io_serv, SIGPIPE));
   sigpipe_set->async_wait([sigpipe_set,this](const boost::system::error_code& err, int num) {
     quit();
     sigpipe_set->cancel();
   });

   io_serv->run();

   shutdown(); /// perform synchronous shutdown
}

2、abstract_plugin类

通过对application类的几个函数的分析,发现初始化启动插件真正的实现,是通过插件自身的initialize方法和startup方法来实现的,下面主要分析其他四个类--插件类。这四个类可以用一个图来表示:
插件类示意图
其中abstract_plugin是虚基类,plugin是它的子类,http_plugin是plugin的子类,http_plugin_impl包含在http_plugin中。即前三个类是继承关系,后面一个类是包含关系。
回到上面遗留的问题--“插件是如何注册其依赖插件的”。
plug->register_dependencies();
父类指针指向的是子类对象,该方法在plugin类中实现。

  virtual void register_dependencies() {
       static_cast<Impl*>(this)->plugin_requires([&](auto& plug){});
 }

plugin_requires函数并非Plugin类的成员函数,是其子类的成员函数,所以父类指针需要转换为子类指针,才可以调用。该函数由一个宏来实现,并实现了递归调用。(BOOST_PP_SEQ_FOR_EACH宏的作用,将第三个参数序列分别与第二个参数进行按照第一个参数的方式进行拼接,)
APPBASE_PLUGIN_REQUIRES((chain_plugin))

//宏代码:
#define APPBASE_PLUGIN_REQUIRES_VISIT( r, visitor, elem ) \
  visitor( appbase::app().register_plugin<elem>() ); 

#define APPBASE_PLUGIN_REQUIRES( PLUGINS )                               \
   template<typename Lambda>                                           \
   void plugin_requires( Lambda&& l ) {                                \
      BOOST_PP_SEQ_FOR_EACH( APPBASE_PLUGIN_REQUIRES_VISIT, l, PLUGINS ) \
   }
//-----------------------------------------展开结果-------------------------------------------
//宏展开:
template<typename Lambda>                                           
   void plugin_requires( Lambda&& l ) {                                
      BOOST_PP_SEQ_FOR_EACH( APPBASE_PLUGIN_REQUIRES_VISIT, l, (chain_plugin) ) 
   }
//继续展开
template<typename Lambda>                                           
   void plugin_requires( Lambda&& l ) {                                
      l( appbase::app().register_plugin<chain_plugin>() );
   }

所以plugin_requires函数为http_plugin类的成员函数,参数为一个lambda表达式。调用过程为static_cast<Impl*>(this)->plugin_requires([&](auto& plug){});,传入的表达式为[&](auto& plug){}。
所以调用此表达式,参数为appbase::app().register_plugin()。使用宏的方式实现成员函数的递归调用,之前没有遇到过这种写法,很奇怪。
再来看abstract_plugin类,此类很简单,代码很少,实现了插件的初始化、启动、停止等几个虚方法,是一个虚基类。具体代码均有其派生类的多态实现,所以下面分析另外三个类。

     virtual void set_program_options( options_description& cli, options_description& cfg ) = 0;
     virtual void initialize(const variables_map& options) = 0;
     virtual void startup() = 0;
     virtual void shutdown() = 0;

3、plugin类

plugin类是eos各个插件的父类,保存了插件的状态(注册、启动等),实现了初始化、启动等接口,内部逻辑是通过子类来实现的。

  virtual void register_dependencies() {
            static_cast<Impl*>(this)->plugin_requires([&](auto& plug){});
         }

         virtual void initialize(const variables_map& options) override {
            if(_state == registered) {
               _state = initialized;
               static_cast<Impl*>(this)->plugin_requires([&](auto& plug){ plug.initialize(options); });
               static_cast<Impl*>(this)->plugin_initialize(options);
               //ilog( "initializing plugin ${name}", ("name",name()) );
               app().plugin_initialized(*this);
            }
            assert(_state == initialized); /// if initial state was not registered, final state cannot be initiaized
         }

         virtual void startup() override {
            if(_state == initialized) {
               _state = started;
               static_cast<Impl*>(this)->plugin_requires([&](auto& plug){ plug.startup(); });
               static_cast<Impl*>(this)->plugin_startup();
               app().plugin_started(*this);
            }
            assert(_state == started); // if initial state was not initialized, final state cannot be started
         }

4、net_plugin类(同级插件:http_plugin、chain_plugin等)

net_plugin类是plugin类的子类,该类主要实现了p2p插件的插件配置、初始化、启动、停止、广播区块的方法,但其内部实现均是通过的net_plugin_impl类的方法来完成的,net_plugin类包含一个net_plugin_impl类的实例(每个插件都是类似的结构)。

   class net_plugin : public appbase::plugin<net_plugin>
   {
      public:
        net_plugin();
        virtual ~net_plugin();

        APPBASE_PLUGIN_REQUIRES((chain_plugin))
        virtual void set_program_options(options_description& cli, options_description& cfg) override;

        void plugin_initialize(const variables_map& options);
        void plugin_startup();
        void plugin_shutdown();

        void   broadcast_block(const chain::signed_block &sb);

        string                       connect( const string& endpoint );
        string                       disconnect( const string& endpoint );
        optional<connection_status>  status( const string& endpoint )const;
        vector<connection_status>    connections()const;

        size_t num_peers() const;
      private:
        std::unique_ptr<class net_plugin_impl> my; 
   };

plugin_initialize 初始化插件,其本质是实例化net_plugin_impl。my为net_plugin_impl类对象的指针。

  void net_plugin::plugin_initialize( const variables_map& options ) {
      ilog("Initialize net plugin");
      try {
         // 读取配置信息,初始化net_plugin_imul 对象的成员变量   
         peer_log_format = options.at( "peer-log-format" ).as<string>();

         my->network_version_match = options.at( "network-version-match" ).as<bool>();

         my->sync_master.reset( new sync_manager( options.at( "sync-fetch-span" ).as<uint32_t>()));
         my->dispatcher.reset( new dispatch_manager );

plugin_startup函数启动了一个p2p节点网络。包括1、设置监听循环,对其他节点发送过来的消息进行响应。2、根据配置文件中的seed节点信息,连接到其他节点,并发送消息请求,同步区块等消息(详细笔记写在下一篇)。通信用到的消息类型共分为如下几种:

  using net_message = static_variant<handshake_message,
                 chain_size_message,
                 go_away_message,
                 time_message,
                 notice_message,
                 request_message,
                 sync_request_message,
                 signed_block,
                 packed_transaction>;

5、net_plugin_impl类

该类涉及到的是核心业务的具体实现。也是最复杂的一个类,下一篇笔记重点写下net插件的学习过程。
net_plugin_impl类通信使用的是boost::asio异步通信的库。该类成员变量包括了当前连接的p2p节点对象、区块同步管理对象、链id、节点id、网络通信用的相关变量。

   class net_plugin_impl {
   public:
      unique_ptr<tcp::acceptor>        acceptor;
      tcp::endpoint                    listen_endpoint;
      string                           p2p_address;
      uint32_t                         max_client_count = 0;
      uint32_t                         max_nodes_per_host = 1;
      uint32_t                         num_clients = 0;

      vector<string>                   supplied_peers;
      vector<chain::public_key_type>   allowed_peers; ///< peer keys allowed to connect
      std::map<chain::public_key_type,
               chain::private_key_type> private_keys; ///< overlapping with producer keys, also authenticating non-producing nodes

      enum possible_connections : char {
         None = 0,
            Producers = 1 << 0,
            Specified = 1 << 1,
            Any = 1 << 2
            };
      possible_connections             allowed_connections{None};

      connection_ptr find_connection( string host )const;

      std::set< connection_ptr >       connections;               // 已连接的p2p seed节点 指针集合
      bool                             done = false;
      unique_ptr< sync_manager >       sync_master;               // 区块同步管理类指针
      unique_ptr< dispatch_manager >   dispatcher;
      .......
}

net_plugin插件的详细内容,下一篇笔记详细写。

转载自:https://github.com/RootkitKiller/EosLearn/blob/master/Eos%E4%BB%A3%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E4%BA%8C%EF%BC%89%E5%88%86%E6%9E%90nodeos%E7%9A%84%E6%B5%81%E7%A8%8B.md

net_plugin插件代码分析

上一篇笔记中,写了nodeos的执行流程。nodeos使用的是一种插件体系,业务代码分布在一个个的插件类中,然后分析了插件类共的继承关系。本篇笔记,就从其中的net插件入手来分析p2p模块的功能,也就是分析学习net_plugin_impl类。同之前的笔记一样,从net插件的生命周期,初始化、启动、停止来分析,并重点分析p2p模块的运行状态。
一个区块链系统的p2p模块,应该包括以下几个职能:
(1)、从对等的节点那里,同步区块数据。
(2)、发送交易给其他节点进行验证。
(3)、验证其他节点发送过来的交易。
(3)、如果自己生产的区块,要发送区块给其他节点。
(4)、验证其他节点发送过来的区块。

1、net_plugin类的plugin_initialize方法(初始化)

此方法主要是用来使用命令行参数或者配置文件中的参数来配置net_plugin_impl类,该插件的业务主要在net_plugin_impl类中实现。下图为执行完plugin_initialize方法后的net_plugin_impl类对象
net_plugin_impl初始化之后

 // 读取配置信息,初始化net_plugin_imul 对象的成员变量   
         peer_log_format = options.at( "peer-log-format" ).as<string>();

         my->network_version_match = options.at( "network-version-match" ).as<bool>();

         my->sync_master.reset( new sync_manager( options.at( "sync-fetch-span" ).as<uint32_t>()));
         my->dispatcher.reset( new dispatch_manager );

         my->connector_period = std::chrono::seconds( options.at( "connection-cleanup-period" ).as<int>());
         my->max_cleanup_time_ms = options.at("max-cleanup-time-msec").as<int>();
         my->txn_exp_period = def_txn_expire_wait;
         my->resp_expected_period = def_resp_expected_wait;
         my->dispatcher->just_send_it_max = options.at( "max-implicit-request" ).as<uint32_t>();
         my->max_client_count = options.at( "max-clients" ).as<int>();
         my->max_nodes_per_host = options.at( "p2p-max-nodes-per-host" ).as<int>();
         my->num_clients = 0;
         my->started_sessions = 0;
        ······
         my->keepalive_timer.reset( new boost::asio::steady_timer( app().get_io_service()));
         my->ticker();        //定时器,给每个连接发送时间戳`

plugin_initialize函数主要是初始化net_plugin_impl对象。并每隔32s给连接的节点发送心跳数据(时间戳数据),其中send_time发送是消息类型为该模块下定义的几种类型之一。

   void net_plugin_impl::ticker() {
      keepalive_timer->expires_from_now (keepalive_interval);
      keepalive_timer->async_wait ([this](boost::system::error_code ec) {
            ticker ();
            if (ec) {
               wlog ("Peer keepalive ticked sooner than expected: ${m}", ("m", ec.message()));
            }
            for (auto &c : connections ) {
               if (c->socket->is_open()) {
                  c->send_time();  //遍历所有的连接,给每个连接定时发送时间戳message
               }
            }
         });
   }

2、net_plugin类的plugin_startup方法(启动运行)

plugin_startup方法是核心方法,包含了网络监听循环、接收数据处理、发送数据等内容。
等待连接部分:绑定、监听,在start_listen_loop函数里,等待其他节点的连接。通过boost::asio实现异步IO,不会阻塞。

      if( my->acceptor ) {
         //使用tcp:v4的协议 打开acceptor接收器   
         my->acceptor->open(my->listen_endpoint.protocol());
         //设置地址复用 Address already in use
         my->acceptor->set_option(tcp::acceptor::reuse_address(true));
         try {
           //绑定 
           my->acceptor->bind(my->listen_endpoint);
         } catch (const std::exception& e) {
           ilog("net_plugin::plugin_startup failed to bind to port ${port}",
             ("port", my->listen_endpoint.port()));
           throw e;
         }
         //监听
         my->acceptor->listen();
         ilog("starting listener, max clients is ${mc}",("mc",my->max_client_count));
         //接受连接 并处理发送过来的消息
         my->start_listen_loop();
      }

等待其他节点的连接

void net_plugin_impl::start_listen_loop( ) {
      //获取单例模式中的io服务,并用其创建一个通信套接字。为什么不重新创建一个io服务?   
      auto socket = std::make_shared<tcp::socket>( std::ref( app().get_io_service() ) );
      acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) {
            if( !ec ) {
               uint32_t visitors = 0;     //统计共有多少个peer_addr变量为非空的连接
               uint32_t from_addr = 0;    //统计所有的连接里面,有几个是当前监听到的连接
               auto paddr = socket->remote_endpoint(ec).address();
               if (ec) {
                  fc_elog(logger,"Error getting remote endpoint: ${m}",("m", ec.message()));
               }
               else {
                  for (auto &conn : connections) {  //遍历当前节点的所有连接
                     if(conn->socket->is_open()) {
                        if (conn->peer_addr.empty()) {
                           visitors++;
                           boost::system::error_code ec;
                           if (paddr == conn->socket->remote_endpoint(ec).address()) {
                              from_addr++;
                           }
                        }
                     }
                  }
                  //修改当前有效连接数
                  if (num_clients != visitors) {
                     ilog ("checking max client, visitors = ${v} num clients ${n}",("v",visitors)("n",num_clients));
                     num_clients = visitors;
                  }
                  //当前有效连接中不包含 新监听到的连接,则加入到有效连接里面,并启动一个会话
                  if( from_addr < max_nodes_per_host && (max_client_count == 0 || num_clients < max_client_count )) {
                     ++num_clients;
                     connection_ptr c = std::make_shared<connection>( socket );
                     connections.insert( c );
                     start_session( c ); //重要 启动一个会话

                  }
                  else {
                     if (from_addr >= max_nodes_per_host) {
                        fc_elog(logger, "Number of connections (${n}) from ${ra} exceeds limit",
                                ("n", from_addr+1)("ra",paddr.to_string()));
                     }
                     else {
                        fc_elog(logger, "Error max_client_count ${m} exceeded",
                                ( "m", max_client_count) );
                     }
                     socket->close( );
                  }
               }
            } else {
               elog( "Error accepting connection: ${m}",( "m", ec.message() ) );
               // For the listed error codes below, recall start_listen_loop()
               switch (ec.value()) {
                  case ECONNABORTED:
                  case EMFILE:
                  case ENFILE:
                  case ENOBUFS:
                  case ENOMEM:
                  case EPROTO:
                     break;
                  default:
                     return;
               }
            }
            //继续等待下一个连接
            start_listen_loop();
         });
   }

当接收到一个有效连接之后,开启一个会话,调用start_session方法,参数c为接受连接的套接字的指针,用来与连接到的节点收发数据。然后不断递归调用,接收下一个连接。其中start_session方法内部,主要是调用start_read_message( con )方法来处理消息的。所以我们需要重点查看start_read_message( con )函数。con和c指向的是同一个套接字。

void net_plugin_impl::start_read_message( connection_ptr conn ) {

      try {
         if(!conn->socket) {        //验证套接字是否有效
            return;
         }
         connection_wptr weak_conn = conn;      //当前connection对象的一个weak_ptr指针
            // 读取会递归调用,第一次读取时 outstanding_read_bytes未初始化,message_header_size初始化为4
         std::size_t minimum_read = conn->outstanding_read_bytes ? *conn->outstanding_read_bytes : message_header_size;

         if (use_socket_read_watermark) {             //一种读取方式,根据node启动时的配置,水印优化读取??? 默认未开启
            const size_t max_socket_read_watermark = 4096;
            std::size_t socket_read_watermark = std::min<std::size_t>(minimum_read, max_socket_read_watermark);
            boost::asio::socket_base::receive_low_watermark read_watermark_opt(socket_read_watermark);
            conn->socket->set_option(read_watermark_opt);
         }

         auto completion_handler = [minimum_read](boost::system::error_code ec, std::size_t bytes_transferred) -> std::size_t {
            if (ec || bytes_transferred >= minimum_read ) {
               return 0;
            } else {
               return minimum_read - bytes_transferred;
            }
         };
            //异步读取数据  pending_message_buffer为缓冲区
         boost::asio::async_read(*conn->socket,
            conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(), completion_handler,
            [this,weak_conn]( boost::system::error_code ec, std::size_t bytes_transferred ) {
               auto conn = weak_conn.lock();    //智能指针是否释放了
               if (!conn) {
                  return;
               }

               conn->outstanding_read_bytes.reset();   //重置outstanding_read_bytes 表示字节数

               try {
                  if( !ec ) {
                        //读取的字节数 大于可写入的字节数 错误
                     if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) {
                        elog("async_read_some callback: bytes_transfered = ${bt}, buffer.bytes_to_write = ${btw}",
                             ("bt",bytes_transferred)("btw",conn->pending_message_buffer.bytes_to_write()));
                     }
                     EOS_ASSERT(bytes_transferred <= conn->pending_message_buffer.bytes_to_write(), plugin_exception, "");
                     // 根据读取的字节数,扩展buffer
                     conn->pending_message_buffer.advance_write_ptr(bytes_transferred);
                     while (conn->pending_message_buffer.bytes_to_read() > 0) { // buffer里面可读的字节数
                        uint32_t bytes_in_buffer = conn->pending_message_buffer.bytes_to_read();
                        //如果buffer里面的字节数小于 4个字节
                        if (bytes_in_buffer < message_header_size) {
                           conn->outstanding_read_bytes.emplace(message_header_size - bytes_in_buffer);
                           break;
                        } else {
                           uint32_t message_length;
                           // 返回当前读取的指针位置
                           auto index = conn->pending_message_buffer.read_index();
                           // 返回消息长度
                           conn->pending_message_buffer.peek(&message_length, sizeof(message_length), index);
                           // 消息长度过长或为0
                           if(message_length > def_send_buffer_size*2 || message_length == 0) {
                              boost::system::error_code ec;
                              elog("incoming message length unexpected (${i}), from ${p}", ("i", message_length)("p",boost::lexical_cast<std::string>(conn->socket->remote_endpoint(ec))));
                              close(conn);
                              return;
                           }

                           auto total_message_bytes = message_length + message_header_size;
                              //读取完一条消息
                           if (bytes_in_buffer >= total_message_bytes) {
                              conn->pending_message_buffer.advance_read_ptr(message_header_size);
                              if (!conn->process_next_message(*this, message_length)) {
                                 return;
                              }
                           } else {
                                 //未读取到某个类型消息结尾 循环重新读取
                              auto outstanding_message_bytes = total_message_bytes - bytes_in_buffer;
                              auto available_buffer_bytes = conn->pending_message_buffer.bytes_to_write();
                              if (outstanding_message_bytes > available_buffer_bytes) {
                                 conn->pending_message_buffer.add_space( outstanding_message_bytes - available_buffer_bytes );
                              }

                              conn->outstanding_read_bytes.emplace(outstanding_message_bytes);
                              break;
                           }
                        }
                     }
                     start_read_message(conn);
                  } else {
                     auto pname = conn->peer_name();
                     if (ec.value() != boost::asio::error::eof) {
                        elog( "Error reading message from ${p}: ${m}",("p",pname)( "m", ec.message() ) );
                     } else {
                        ilog( "Peer ${p} closed connection",("p",pname) );
                     }
                     close( conn );
                  }
               }
               catch(const std::exception &ex) {
                  string pname = conn ? conn->peer_name() : "no connection name";
                  elog("Exception in handling read data from ${p} ${s}",("p",pname)("s",ex.what()));
                  close( conn );
               }
               catch(const fc::exception &ex) {
                  string pname = conn ? conn->peer_name() : "no connection name";
                  elog("Exception in handling read data ${s}", ("p",pname)("s",ex.to_string()));
                  close( conn );
               }
               catch (...) {
                  string pname = conn ? conn->peer_name() : "no connection name";
                  elog( "Undefined exception hanlding the read data from connection ${p}",( "p",pname));
                  close( conn );
               }
            } );
      } catch (...) {
         string pname = conn ? conn->peer_name() : "no connection name";
         elog( "Undefined exception handling reading ${p}",("p",pname) );
         close( conn );
      }
   }

处理接收到的数据的函数比较长,基本重要的地方都标注了注释。该函数对接收数据的处理,主要是循环接收数据,并识别为要处理的消息类型,在eos系统下,通信的消息类型共分为如下几种,每种消息重载了一个handle_message函数来处理。在process_next_message函数里面进行消息分发。

handshake_message, 握手消息类型
chain_size_message, 未使用
go_away_message, 退出连接消息类型
time_message, 时间戳消息类型
notice_message, 通知消息类型,在区块同步中,该类型包含了区块状态等信息
request_message, 同步区块
sync_request_message, 同步区块
signed_block, 区块详细数据
packed_transaction 打包交易

bool connection:: process_next_message(net_plugin_impl& impl, uint32_t message_length) {
      try {
         // If it is a signed_block, then save the raw message for the cache
         // This must be done before we unpack the message.
         // This code is copied from fc::io::unpack(..., unsigned_int)
         auto index = pending_message_buffer.read_index();
         uint64_t which = 0; char b = 0; uint8_t by = 0;
         do {
            pending_message_buffer.peek(&b, 1, index);
            which |= uint32_t(uint8_t(b) & 0x7f) << by;
            by += 7;
         } while( uint8_t(b) & 0x80 && by < 32);  //如果是block,是签名的,需要先验证签名,再解压,其他消息类型随意。

         if (which == uint64_t(net_message::tag<signed_block>::value)) { // 验证签名 读取下一个消息
            blk_buffer.resize(message_length);
            auto index = pending_message_buffer.read_index();
            pending_message_buffer.peek(blk_buffer.data(), message_length, index);
         }
         auto ds = pending_message_buffer.create_datastream();
         net_message msg;
         fc::raw::unpack(ds, msg);              //解压缩message消息 
         msgHandler m(impl, shared_from_this() );
         msg.visit(m);        //调用的是net_plugin_impl 的成员函数handle_message
      } catch(  const fc::exception& e ) {
         edump((e.to_detail_string() ));
         impl.close( shared_from_this() );
         return false;
      }
      return true;
   }

重载的消息处理函数(具体发送的数据类型,下一篇笔记在详细写,目前还没有调试明白)

      void handle_message( connection_ptr c, const notice_message &msg);
      void handle_message( connection_ptr c, const request_message &msg);
      void handle_message( connection_ptr c, const sync_request_message &msg);
      void handle_message( connection_ptr c, const signed_block &msg);
      void handle_message( connection_ptr c, const packed_transaction &msg);

除了监听等待连接之外,该插件启动后也会向其他节点发送数据,发送数据部分:

      my->start_monitors();

      for( auto seed_node : my->supplied_peers ) {
         connect( seed_node );
      }

      if(fc::get_logger_map().find(logger_name) != fc::get_logger_map().end())
         logger = fc::get_logger_map()[logger_name];

start_monitors启动两个监控,监控新加入的连接,监控过期的交易,并移除(此种方式没有太理解,后面分析到chain插件的时候,再回过头来看)。

void net_plugin_impl::start_monitors() {
      connector_check.reset(new boost::asio::steady_timer( app().get_io_service()));
      transaction_check.reset(new boost::asio::steady_timer( app().get_io_service()));
      start_conn_timer(connector_period, std::weak_ptr<connection>());
      start_txn_timer();
   }

之后for循环,连接到seed节点。connect函数重载了两个,一个接受节点信息为参数,另一个实现具体业务逻辑。

   void net_plugin_impl::connect( connection_ptr c ) {
      if( c->no_retry != go_away_reason::no_reason) {
         fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( c->no_retry )));
         return;
      }

      auto colon = c->peer_addr.find(':');

      if (colon == std::string::npos || colon == 0) {
         elog ("Invalid peer address. must be \"host:port\": ${p}", ("p",c->peer_addr));
         for ( auto itr : connections ) {
            if((*itr).peer_addr == c->peer_addr) {
               (*itr).reset();
               close(itr);
               connections.erase(itr);
               break;
            }
         }
         return;
      }

      auto host = c->peer_addr.substr( 0, colon );
      auto port = c->peer_addr.substr( colon + 1);
      idump((host)(port));
      tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str() );
      connection_wptr weak_conn = c;
      // Note: need to add support for IPv6 too
    //异步解析seed节点
      resolver->async_resolve( query,
                               [weak_conn, this]( const boost::system::error_code& err,
                                          tcp::resolver::iterator endpoint_itr ){
                                  auto c = weak_conn.lock();
                                  if (!c) return;
                                  if( !err ) {
                                     connect( c, endpoint_itr );  //调用重载函数,实现内部逻辑
                                  } else {
                                     elog( "Unable to resolve ${peer_addr}: ${error}",
                                           (  "peer_addr", c->peer_name() )("error", err.message() ) );
                                  }
                               });
   }
  //重载之后的connect,实现了异步连接到其他节点
   void net_plugin_impl::connect( connection_ptr c, tcp::resolver::iterator endpoint_itr ) {
      if( c->no_retry != go_away_reason::no_reason) {
         string rsn = reason_str(c->no_retry);
         return;
      }
      auto current_endpoint = *endpoint_itr;
      ++endpoint_itr;
      c->connecting = true;
      connection_wptr weak_conn = c;
      c->socket->async_connect( current_endpoint, [weak_conn, endpoint_itr, this] ( const boost::system::error_code& err ) {
            auto c = weak_conn.lock();
            if (!c) return;
            if( !err && c->socket->is_open() ) {
               if (start_session( c )) {
                  c->send_handshake ();   //连接上之后,给其他节点发送握手消息,可见握手消息是非常重要的,下一步重要调试握手消息的报文内容
               }
            } else {
               if( endpoint_itr != tcp::resolver::iterator() ) {
                  close(c);
                  connect( c, endpoint_itr );
               }
               else {
                  elog( "connection failed to ${peer}: ${error}",
                        ( "peer", c->peer_name())("error",err.message()));
                  c->connecting = false;
                  my_impl->close(c);
               }
            }
         } );
   }

//握手消息报文结构
struct handshake_message {
uint16_t network_version = 0; ///< incremental value above a computed base
chain_id_type chain_id; ///< used to identify chain
fc::sha256 node_id; ///< used to identify peers and prevent self-connect
chain::public_key_type key; ///< authentication key; may be a producer or peer key, or empty
tstamp time;
fc::sha256 token; ///< digest of time to prove we own the private key of the key above
chain::signature_type sig; ///< signature for the digest
string p2p_address;
uint32_t last_irreversible_block_num = 0;
block_id_type last_irreversible_block_id;
uint32_t head_num = 0;
block_id_type head_id;
string os;
string agent;
int16_t generation;
};

3、net_plugin类的plugin_shutdown方法(停止)

plugin_shutdown方法主要功能是关闭监听接收器,循环关闭每个连接,释放资源,代码较少,如下所示:

   void net_plugin::plugin_shutdown() {
      try {
         ilog( "shutdown.." );
         my->done = true;
         if( my->acceptor ) {
            ilog( "close acceptor" );
            my->acceptor->close();

            ilog( "close ${s} connections",( "s",my->connections.size()) );
            auto cons = my->connections;
            for( auto con : cons ) {
               my->close( con);
            }

            my->acceptor.reset(nullptr);
         }
         ilog( "exit shutdown" );
      }
      FC_CAPTURE_AND_RETHROW()
   }

转载自:
https://github.com/RootkitKiller/EosLearn/blob/master/Eos%E4%BB%A3%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E4%B8%89%EF%BC%89net_plugin%E6%8F%92%E4%BB%B6%E8%AF%A6%E7%BB%86%E5%88%86%E6%9E%90.md