您正在查看: Surou 发布的文章

net_plugin插件代码分析

上一篇笔记中,写了nodeos的执行流程。nodeos使用的是一种插件体系,业务代码分布在一个个的插件类中,然后分析了插件类共的继承关系。本篇笔记,就从其中的net插件入手来分析p2p模块的功能,也就是分析学习net_plugin_impl类。同之前的笔记一样,从net插件的生命周期,初始化、启动、停止来分析,并重点分析p2p模块的运行状态。
一个区块链系统的p2p模块,应该包括以下几个职能:
(1)、从对等的节点那里,同步区块数据。
(2)、发送交易给其他节点进行验证。
(3)、验证其他节点发送过来的交易。
(3)、如果自己生产的区块,要发送区块给其他节点。
(4)、验证其他节点发送过来的区块。

1、net_plugin类的plugin_initialize方法(初始化)

此方法主要是用来使用命令行参数或者配置文件中的参数来配置net_plugin_impl类,该插件的业务主要在net_plugin_impl类中实现。下图为执行完plugin_initialize方法后的net_plugin_impl类对象
net_plugin_impl初始化之后

 // 读取配置信息,初始化net_plugin_imul 对象的成员变量   
         peer_log_format = options.at( "peer-log-format" ).as<string>();

         my->network_version_match = options.at( "network-version-match" ).as<bool>();

         my->sync_master.reset( new sync_manager( options.at( "sync-fetch-span" ).as<uint32_t>()));
         my->dispatcher.reset( new dispatch_manager );

         my->connector_period = std::chrono::seconds( options.at( "connection-cleanup-period" ).as<int>());
         my->max_cleanup_time_ms = options.at("max-cleanup-time-msec").as<int>();
         my->txn_exp_period = def_txn_expire_wait;
         my->resp_expected_period = def_resp_expected_wait;
         my->dispatcher->just_send_it_max = options.at( "max-implicit-request" ).as<uint32_t>();
         my->max_client_count = options.at( "max-clients" ).as<int>();
         my->max_nodes_per_host = options.at( "p2p-max-nodes-per-host" ).as<int>();
         my->num_clients = 0;
         my->started_sessions = 0;
        ······
         my->keepalive_timer.reset( new boost::asio::steady_timer( app().get_io_service()));
         my->ticker();        //定时器,给每个连接发送时间戳`

plugin_initialize函数主要是初始化net_plugin_impl对象。并每隔32s给连接的节点发送心跳数据(时间戳数据),其中send_time发送是消息类型为该模块下定义的几种类型之一。

   void net_plugin_impl::ticker() {
      keepalive_timer->expires_from_now (keepalive_interval);
      keepalive_timer->async_wait ([this](boost::system::error_code ec) {
            ticker ();
            if (ec) {
               wlog ("Peer keepalive ticked sooner than expected: ${m}", ("m", ec.message()));
            }
            for (auto &c : connections ) {
               if (c->socket->is_open()) {
                  c->send_time();  //遍历所有的连接,给每个连接定时发送时间戳message
               }
            }
         });
   }

2、net_plugin类的plugin_startup方法(启动运行)

plugin_startup方法是核心方法,包含了网络监听循环、接收数据处理、发送数据等内容。
等待连接部分:绑定、监听,在start_listen_loop函数里,等待其他节点的连接。通过boost::asio实现异步IO,不会阻塞。

      if( my->acceptor ) {
         //使用tcp:v4的协议 打开acceptor接收器   
         my->acceptor->open(my->listen_endpoint.protocol());
         //设置地址复用 Address already in use
         my->acceptor->set_option(tcp::acceptor::reuse_address(true));
         try {
           //绑定 
           my->acceptor->bind(my->listen_endpoint);
         } catch (const std::exception& e) {
           ilog("net_plugin::plugin_startup failed to bind to port ${port}",
             ("port", my->listen_endpoint.port()));
           throw e;
         }
         //监听
         my->acceptor->listen();
         ilog("starting listener, max clients is ${mc}",("mc",my->max_client_count));
         //接受连接 并处理发送过来的消息
         my->start_listen_loop();
      }

等待其他节点的连接

void net_plugin_impl::start_listen_loop( ) {
      //获取单例模式中的io服务,并用其创建一个通信套接字。为什么不重新创建一个io服务?   
      auto socket = std::make_shared<tcp::socket>( std::ref( app().get_io_service() ) );
      acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) {
            if( !ec ) {
               uint32_t visitors = 0;     //统计共有多少个peer_addr变量为非空的连接
               uint32_t from_addr = 0;    //统计所有的连接里面,有几个是当前监听到的连接
               auto paddr = socket->remote_endpoint(ec).address();
               if (ec) {
                  fc_elog(logger,"Error getting remote endpoint: ${m}",("m", ec.message()));
               }
               else {
                  for (auto &conn : connections) {  //遍历当前节点的所有连接
                     if(conn->socket->is_open()) {
                        if (conn->peer_addr.empty()) {
                           visitors++;
                           boost::system::error_code ec;
                           if (paddr == conn->socket->remote_endpoint(ec).address()) {
                              from_addr++;
                           }
                        }
                     }
                  }
                  //修改当前有效连接数
                  if (num_clients != visitors) {
                     ilog ("checking max client, visitors = ${v} num clients ${n}",("v",visitors)("n",num_clients));
                     num_clients = visitors;
                  }
                  //当前有效连接中不包含 新监听到的连接,则加入到有效连接里面,并启动一个会话
                  if( from_addr < max_nodes_per_host && (max_client_count == 0 || num_clients < max_client_count )) {
                     ++num_clients;
                     connection_ptr c = std::make_shared<connection>( socket );
                     connections.insert( c );
                     start_session( c ); //重要 启动一个会话

                  }
                  else {
                     if (from_addr >= max_nodes_per_host) {
                        fc_elog(logger, "Number of connections (${n}) from ${ra} exceeds limit",
                                ("n", from_addr+1)("ra",paddr.to_string()));
                     }
                     else {
                        fc_elog(logger, "Error max_client_count ${m} exceeded",
                                ( "m", max_client_count) );
                     }
                     socket->close( );
                  }
               }
            } else {
               elog( "Error accepting connection: ${m}",( "m", ec.message() ) );
               // For the listed error codes below, recall start_listen_loop()
               switch (ec.value()) {
                  case ECONNABORTED:
                  case EMFILE:
                  case ENFILE:
                  case ENOBUFS:
                  case ENOMEM:
                  case EPROTO:
                     break;
                  default:
                     return;
               }
            }
            //继续等待下一个连接
            start_listen_loop();
         });
   }

当接收到一个有效连接之后,开启一个会话,调用start_session方法,参数c为接受连接的套接字的指针,用来与连接到的节点收发数据。然后不断递归调用,接收下一个连接。其中start_session方法内部,主要是调用start_read_message( con )方法来处理消息的。所以我们需要重点查看start_read_message( con )函数。con和c指向的是同一个套接字。

void net_plugin_impl::start_read_message( connection_ptr conn ) {

      try {
         if(!conn->socket) {        //验证套接字是否有效
            return;
         }
         connection_wptr weak_conn = conn;      //当前connection对象的一个weak_ptr指针
            // 读取会递归调用,第一次读取时 outstanding_read_bytes未初始化,message_header_size初始化为4
         std::size_t minimum_read = conn->outstanding_read_bytes ? *conn->outstanding_read_bytes : message_header_size;

         if (use_socket_read_watermark) {             //一种读取方式,根据node启动时的配置,水印优化读取??? 默认未开启
            const size_t max_socket_read_watermark = 4096;
            std::size_t socket_read_watermark = std::min<std::size_t>(minimum_read, max_socket_read_watermark);
            boost::asio::socket_base::receive_low_watermark read_watermark_opt(socket_read_watermark);
            conn->socket->set_option(read_watermark_opt);
         }

         auto completion_handler = [minimum_read](boost::system::error_code ec, std::size_t bytes_transferred) -> std::size_t {
            if (ec || bytes_transferred >= minimum_read ) {
               return 0;
            } else {
               return minimum_read - bytes_transferred;
            }
         };
            //异步读取数据  pending_message_buffer为缓冲区
         boost::asio::async_read(*conn->socket,
            conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(), completion_handler,
            [this,weak_conn]( boost::system::error_code ec, std::size_t bytes_transferred ) {
               auto conn = weak_conn.lock();    //智能指针是否释放了
               if (!conn) {
                  return;
               }

               conn->outstanding_read_bytes.reset();   //重置outstanding_read_bytes 表示字节数

               try {
                  if( !ec ) {
                        //读取的字节数 大于可写入的字节数 错误
                     if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) {
                        elog("async_read_some callback: bytes_transfered = ${bt}, buffer.bytes_to_write = ${btw}",
                             ("bt",bytes_transferred)("btw",conn->pending_message_buffer.bytes_to_write()));
                     }
                     EOS_ASSERT(bytes_transferred <= conn->pending_message_buffer.bytes_to_write(), plugin_exception, "");
                     // 根据读取的字节数,扩展buffer
                     conn->pending_message_buffer.advance_write_ptr(bytes_transferred);
                     while (conn->pending_message_buffer.bytes_to_read() > 0) { // buffer里面可读的字节数
                        uint32_t bytes_in_buffer = conn->pending_message_buffer.bytes_to_read();
                        //如果buffer里面的字节数小于 4个字节
                        if (bytes_in_buffer < message_header_size) {
                           conn->outstanding_read_bytes.emplace(message_header_size - bytes_in_buffer);
                           break;
                        } else {
                           uint32_t message_length;
                           // 返回当前读取的指针位置
                           auto index = conn->pending_message_buffer.read_index();
                           // 返回消息长度
                           conn->pending_message_buffer.peek(&message_length, sizeof(message_length), index);
                           // 消息长度过长或为0
                           if(message_length > def_send_buffer_size*2 || message_length == 0) {
                              boost::system::error_code ec;
                              elog("incoming message length unexpected (${i}), from ${p}", ("i", message_length)("p",boost::lexical_cast<std::string>(conn->socket->remote_endpoint(ec))));
                              close(conn);
                              return;
                           }

                           auto total_message_bytes = message_length + message_header_size;
                              //读取完一条消息
                           if (bytes_in_buffer >= total_message_bytes) {
                              conn->pending_message_buffer.advance_read_ptr(message_header_size);
                              if (!conn->process_next_message(*this, message_length)) {
                                 return;
                              }
                           } else {
                                 //未读取到某个类型消息结尾 循环重新读取
                              auto outstanding_message_bytes = total_message_bytes - bytes_in_buffer;
                              auto available_buffer_bytes = conn->pending_message_buffer.bytes_to_write();
                              if (outstanding_message_bytes > available_buffer_bytes) {
                                 conn->pending_message_buffer.add_space( outstanding_message_bytes - available_buffer_bytes );
                              }

                              conn->outstanding_read_bytes.emplace(outstanding_message_bytes);
                              break;
                           }
                        }
                     }
                     start_read_message(conn);
                  } else {
                     auto pname = conn->peer_name();
                     if (ec.value() != boost::asio::error::eof) {
                        elog( "Error reading message from ${p}: ${m}",("p",pname)( "m", ec.message() ) );
                     } else {
                        ilog( "Peer ${p} closed connection",("p",pname) );
                     }
                     close( conn );
                  }
               }
               catch(const std::exception &ex) {
                  string pname = conn ? conn->peer_name() : "no connection name";
                  elog("Exception in handling read data from ${p} ${s}",("p",pname)("s",ex.what()));
                  close( conn );
               }
               catch(const fc::exception &ex) {
                  string pname = conn ? conn->peer_name() : "no connection name";
                  elog("Exception in handling read data ${s}", ("p",pname)("s",ex.to_string()));
                  close( conn );
               }
               catch (...) {
                  string pname = conn ? conn->peer_name() : "no connection name";
                  elog( "Undefined exception hanlding the read data from connection ${p}",( "p",pname));
                  close( conn );
               }
            } );
      } catch (...) {
         string pname = conn ? conn->peer_name() : "no connection name";
         elog( "Undefined exception handling reading ${p}",("p",pname) );
         close( conn );
      }
   }

处理接收到的数据的函数比较长,基本重要的地方都标注了注释。该函数对接收数据的处理,主要是循环接收数据,并识别为要处理的消息类型,在eos系统下,通信的消息类型共分为如下几种,每种消息重载了一个handle_message函数来处理。在process_next_message函数里面进行消息分发。

handshake_message, 握手消息类型
chain_size_message, 未使用
go_away_message, 退出连接消息类型
time_message, 时间戳消息类型
notice_message, 通知消息类型,在区块同步中,该类型包含了区块状态等信息
request_message, 同步区块
sync_request_message, 同步区块
signed_block, 区块详细数据
packed_transaction 打包交易

bool connection:: process_next_message(net_plugin_impl& impl, uint32_t message_length) {
      try {
         // If it is a signed_block, then save the raw message for the cache
         // This must be done before we unpack the message.
         // This code is copied from fc::io::unpack(..., unsigned_int)
         auto index = pending_message_buffer.read_index();
         uint64_t which = 0; char b = 0; uint8_t by = 0;
         do {
            pending_message_buffer.peek(&b, 1, index);
            which |= uint32_t(uint8_t(b) & 0x7f) << by;
            by += 7;
         } while( uint8_t(b) & 0x80 && by < 32);  //如果是block,是签名的,需要先验证签名,再解压,其他消息类型随意。

         if (which == uint64_t(net_message::tag<signed_block>::value)) { // 验证签名 读取下一个消息
            blk_buffer.resize(message_length);
            auto index = pending_message_buffer.read_index();
            pending_message_buffer.peek(blk_buffer.data(), message_length, index);
         }
         auto ds = pending_message_buffer.create_datastream();
         net_message msg;
         fc::raw::unpack(ds, msg);              //解压缩message消息 
         msgHandler m(impl, shared_from_this() );
         msg.visit(m);        //调用的是net_plugin_impl 的成员函数handle_message
      } catch(  const fc::exception& e ) {
         edump((e.to_detail_string() ));
         impl.close( shared_from_this() );
         return false;
      }
      return true;
   }

重载的消息处理函数(具体发送的数据类型,下一篇笔记在详细写,目前还没有调试明白)

      void handle_message( connection_ptr c, const notice_message &msg);
      void handle_message( connection_ptr c, const request_message &msg);
      void handle_message( connection_ptr c, const sync_request_message &msg);
      void handle_message( connection_ptr c, const signed_block &msg);
      void handle_message( connection_ptr c, const packed_transaction &msg);

除了监听等待连接之外,该插件启动后也会向其他节点发送数据,发送数据部分:

      my->start_monitors();

      for( auto seed_node : my->supplied_peers ) {
         connect( seed_node );
      }

      if(fc::get_logger_map().find(logger_name) != fc::get_logger_map().end())
         logger = fc::get_logger_map()[logger_name];

start_monitors启动两个监控,监控新加入的连接,监控过期的交易,并移除(此种方式没有太理解,后面分析到chain插件的时候,再回过头来看)。

void net_plugin_impl::start_monitors() {
      connector_check.reset(new boost::asio::steady_timer( app().get_io_service()));
      transaction_check.reset(new boost::asio::steady_timer( app().get_io_service()));
      start_conn_timer(connector_period, std::weak_ptr<connection>());
      start_txn_timer();
   }

之后for循环,连接到seed节点。connect函数重载了两个,一个接受节点信息为参数,另一个实现具体业务逻辑。

   void net_plugin_impl::connect( connection_ptr c ) {
      if( c->no_retry != go_away_reason::no_reason) {
         fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( c->no_retry )));
         return;
      }

      auto colon = c->peer_addr.find(':');

      if (colon == std::string::npos || colon == 0) {
         elog ("Invalid peer address. must be \"host:port\": ${p}", ("p",c->peer_addr));
         for ( auto itr : connections ) {
            if((*itr).peer_addr == c->peer_addr) {
               (*itr).reset();
               close(itr);
               connections.erase(itr);
               break;
            }
         }
         return;
      }

      auto host = c->peer_addr.substr( 0, colon );
      auto port = c->peer_addr.substr( colon + 1);
      idump((host)(port));
      tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str() );
      connection_wptr weak_conn = c;
      // Note: need to add support for IPv6 too
    //异步解析seed节点
      resolver->async_resolve( query,
                               [weak_conn, this]( const boost::system::error_code& err,
                                          tcp::resolver::iterator endpoint_itr ){
                                  auto c = weak_conn.lock();
                                  if (!c) return;
                                  if( !err ) {
                                     connect( c, endpoint_itr );  //调用重载函数,实现内部逻辑
                                  } else {
                                     elog( "Unable to resolve ${peer_addr}: ${error}",
                                           (  "peer_addr", c->peer_name() )("error", err.message() ) );
                                  }
                               });
   }
  //重载之后的connect,实现了异步连接到其他节点
   void net_plugin_impl::connect( connection_ptr c, tcp::resolver::iterator endpoint_itr ) {
      if( c->no_retry != go_away_reason::no_reason) {
         string rsn = reason_str(c->no_retry);
         return;
      }
      auto current_endpoint = *endpoint_itr;
      ++endpoint_itr;
      c->connecting = true;
      connection_wptr weak_conn = c;
      c->socket->async_connect( current_endpoint, [weak_conn, endpoint_itr, this] ( const boost::system::error_code& err ) {
            auto c = weak_conn.lock();
            if (!c) return;
            if( !err && c->socket->is_open() ) {
               if (start_session( c )) {
                  c->send_handshake ();   //连接上之后,给其他节点发送握手消息,可见握手消息是非常重要的,下一步重要调试握手消息的报文内容
               }
            } else {
               if( endpoint_itr != tcp::resolver::iterator() ) {
                  close(c);
                  connect( c, endpoint_itr );
               }
               else {
                  elog( "connection failed to ${peer}: ${error}",
                        ( "peer", c->peer_name())("error",err.message()));
                  c->connecting = false;
                  my_impl->close(c);
               }
            }
         } );
   }

//握手消息报文结构
struct handshake_message {
uint16_t network_version = 0; ///< incremental value above a computed base
chain_id_type chain_id; ///< used to identify chain
fc::sha256 node_id; ///< used to identify peers and prevent self-connect
chain::public_key_type key; ///< authentication key; may be a producer or peer key, or empty
tstamp time;
fc::sha256 token; ///< digest of time to prove we own the private key of the key above
chain::signature_type sig; ///< signature for the digest
string p2p_address;
uint32_t last_irreversible_block_num = 0;
block_id_type last_irreversible_block_id;
uint32_t head_num = 0;
block_id_type head_id;
string os;
string agent;
int16_t generation;
};

3、net_plugin类的plugin_shutdown方法(停止)

plugin_shutdown方法主要功能是关闭监听接收器,循环关闭每个连接,释放资源,代码较少,如下所示:

   void net_plugin::plugin_shutdown() {
      try {
         ilog( "shutdown.." );
         my->done = true;
         if( my->acceptor ) {
            ilog( "close acceptor" );
            my->acceptor->close();

            ilog( "close ${s} connections",( "s",my->connections.size()) );
            auto cons = my->connections;
            for( auto con : cons ) {
               my->close( con);
            }

            my->acceptor.reset(nullptr);
         }
         ilog( "exit shutdown" );
      }
      FC_CAPTURE_AND_RETHROW()
   }

转载自:
https://github.com/RootkitKiller/EosLearn/blob/master/Eos%E4%BB%A3%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%EF%BC%88%E4%B8%89%EF%BC%89net_plugin%E6%8F%92%E4%BB%B6%E8%AF%A6%E7%BB%86%E5%88%86%E6%9E%90.md

EOS区块生产和区块同步

1 概述

本文所述基于EOSv1.2.3。
EOS区块生产和同步主要涉及共识算法DPOS和aBFT,其源码实现主要涉及chain_plugin、producer_plugin、net_plugin和controller4个模块以及eosio.system智能合约等。

2 共识算法

EOS的区块生产,遵循DPoS(Delegated Proof-of-Stake)机制。
简单来说,所有拥有EOS token的人都是EOS区块生产的参与者。
任何人都可以申请出块。
任何人都可以选择不直接完成出块工作,而是将自己所持token抵押给出块申请者(PoS),委托(Delegate)他们完成出块工作。
最终,按照token比例,选出前21名出块者(BP,Block Producer),由他们代理出块。

ps:上述部分,EOS选举主要在eosio.system智能合约中实现,设置生产者队列函数为update_elected_producers()。笔者工作EOS不用于公链而是联盟链,不需要选举,故直接调用eosio.bios中的setprods()函数设置。

21名BP依次轮流出块,不像比特币等,同一时刻所有BP是竞争关系。每个BP轮到自己出块时,连续出块12个,每个块耗时500ms。

ps:上述这一块逻辑在producer_plugin中实现。

至此,我们可以简单的说,EOS的共识机制叫DPoS(More than that)。

EOS中每个区块被生产出来后,需要所有BP的确认,按照BFT共识机制确认后,才会变成不可逆的状态,在此之前都是reversible block。因此,需要进行区块在网络中的同步。
数据一致性是分布式系统数据同步的重要话题,BFT(Byzantine fault tolerance)是其中的一种代表共识机制,或者称它为算法。

如上图所示,BFT共识机制主要有以下步骤:

  • 提案1个block;
  • 进入Pre-commitment阶段,所有BP确认该提案;
  • 进入Commitment阶段,所有BP收到2/3+1或更多Pre-commit后,发送Commit;
  • 当某个BP收到2/3+1或更多Commit后,该block即不可逆。
  • 该算法适用于恶意节点不超过1/3的场景,可以保证达到最终一致性。
至此,我们可以说:EOS的共识机制叫DPoS & BFT。

BFT算法对每个区块需要发送至少2条消息,EOS对此进行了优化。BM将其称为pipelined:管道式、流水线式。
即每次生产一个区块,由于本身就需要将该区块广播到p2p网络中供其他节点同步,因此将Pre-Commit和Commit数据与该区块数据一并广播出去。
如下图所示:

上半部分描述了区块number不可逆的过程,下半部分描述了每个区块由谁Pre-Commit,由谁Commit。
假设有ABCD四个BP,每个BP每次出1个区块。BFT要求2/3+1个节点确认,即2/3*4+1=3。
假设每轮都是按照A、B、C、D的顺序出块。

  • A 出块1,Pre-Commit不可逆块为0,Commit不可逆块为0
  • B 出块2,Pre-Commit不可逆块为0,Commit不可逆块为0
  • C 出块3,ABC3个节点Pre-Commit区块1,故Pre-Commit不可逆块为1,Commit不可逆块为0
  • D 出块4,BCD3个节点Pre-Commit区块2,故Pre-Commit不可逆块为2,Commit不可逆块为0
  • A 出块5,CDA3个节点Pre-Commit区块3,故Pre-Commit不可逆块为3,CDA3个节点Commit区块1,故Commit不可逆块为1
  • B 出块6,DAB3个节点Pre-Commit区块4,故Pre-Commit不可逆块为4,DAB3个节点Commit区块2,故Commit不可逆块为2
  • 以此类推

基于上述优化,p2p网络的带宽压力、区块哈希验证频次等大幅降低。
但带来的问题是,出块、不可逆糅合在了一起,而非并行的。
BM的解释是:虽然如此,相对其它平台(比特币,以太坊)的机制来说,一个区块从生产出来到变为不可逆,其时间不算太久,且实际上每两个不可逆块之间的间隙时间非常短。长远考虑,只有这样,才能实现更好的扩展性。

原文地址:DPOS BFT— Pipelined Byzantine Fault Tolerance

至此,我们应当说:EOS的共识机制叫 DPOS BFT— Pipelined Byzantine Fault Tolerance。

3 chain_plugin

chain_plugin插件主要功能为:

  • 检查启动参数,判断是否需要replay区块链
  • 初始化和拉起Controller模块
  • 提供相关信号、方法,主要用于给controller、net_plugin、producer_plugin、用户输入等架桥
  • 向用户提供链的其它set/get接口,诸如:get_account、get_transaction_id……

下图为chain_plugin核心代码逻辑

3.1 检查启动参数,判断是否需要replay()区块链

chain_plugin启动前,先调用plugin_initialize()函数,该函数检查启动参数。除了一些基本配置外,还检查replay相关参数:

  • export-reversible-blocks:会将原来的reversible_block进行导出备份,该参数单独优先检查
  • delete-all-blocks:删除所有旧的blocks,重头开始replay区块链,包括清空State DB和Block log
  • hard-replay-blockchain:清空State DB,如果有配置truncate-at-block,按照[first,truncate-at-block]区间,否则[first,end]区间重新加载Block log中的区块,即该区间的区块仍然生效不replay,其它区块全部replay。如果reversible db有数据,一并尝试恢复
  • replay-blockchain:清空State DB,如果有配置fix-reversible_blocks,则尝试恢复reversible DB
  • fix-reversible_blocks:尝试恢复reversible DB
  • import-reversible_blocks:旧的不要,用导入的替代

3.2 初始化和拉起Controller模块

当plugin_initialize()上述操作处理完后,根据最终配置参数,对Controller模块进行初始化(emplace()-->构造函数),然后在调用plugin_startup()时调用了Controller::startup,完成了Controller模块的启动

3.3 提供相关信号、方法,主要用于给controller、net_plugin、producer_plugin、用户输入等架桥

  • accepted_block:当net_plugin收到一个区块,或者用户手动push的区块(可能仅仅是一个接口,该场景应当不常见),chain_plugin通过该方法转发区块到producer_plugin中
  • accepted-transactions:同理。需注意,cleos/http的push transaction和push action等,均在此处入链。

3.4 向用户提供链的其它set/get接口

实现了http接口中的一部分接口。

4 producer_plugin

producer_plugin插件主要功能为:

  • 当本节点处于生产阶段时,与chain_plugin::controller交互,调用controller相关接口进行区块生产;
  • 当本节点处于同步阶段时,接收net_plugin插件收到的区块,调用controller相关接口进行区块同步。

producer_plugin核心函数有:

  • schedule_production_loop:递归调用,生产区块
  • start_block:初始化当前区块数据
  • produce_block:打包、签名、提交区块入链
  • on_incoming_block:接收net_plugin收到的区块,入本节点链
  • on_incoming_transaction_async:接收net_plugin收到的交易,入本节点链
  • on_block:本节点区块入链成功后,相关状态更新
  • on_irreversible_block:本节点区块不可逆后,相关状态更新

下图为producer_plugin生产、同步区块的核心代码逻辑:

4.1 生产区块

如上所示,producer_plugin启动后,调用schedule_production_loop()函数,该函数是插件的最核心的函数,是一个递归函数,负责无限循环出块。
EOS500ms出一个块,因此需要启动一个定时器_timer,逻辑如下:

  • 关闭之前的定时器:_timer.cancel()
  • 调用start_block()函数初始化新的区块信息。首先调用controller::abort_block()重置数据,然后调用controller::start_block(),根据上一个区块信息生成一个新的区块,最后将由于controller::abort_block()而未来得及入链的交易(unapplied_transactions)重新push_transaction()进新的区块。
  • 重启定时器_timer(),异步等待新区块截止时间,这期间内,EOS等待并接收用户的交易请求。
  • 如果用户有新交易,调用push_transaction()执行交易并保存到区块中。
  • 如果没有其它情况,定时器到期后,调用produce_block(),对该区块进行打包(finalize_block())、签名(sign_block()),然后提交(commit_block())到本节点区块链(fork_db)上,commit_block()会发送accepted_block信号给订阅者,这其中包括producer_plugin::on_block(),该函数进行相关数据更新。fork_db每次新增一个区块,就会检查是否有新的不可逆区块产生,如果有,发送irrerersible_block信号给订阅者,这其中包括producer_plugin::on_irreversible(),该函数进行相关数据更新。最后,再次递归调用schedule_production_loop()进行下一个区块生产。
  • 如果出现其它原因,如收到网络上发送过来的区块,且定时器未到期,则会转入区块同步逻辑,之前的所有执行会被重置,未来得及执行的交易会被备份到unapplied_transactions中。

4.2 同步区块

producer_plugin的主流程是schedule_production_loop(),其中定时器_timer会根据实际情况设置等待时间。如是轮到该节点生产区块,则每次等待的时间为:总时间(500ms)-已用时间。如果未轮到该节点生产区块,则计算下一次出块时间(more than 500ms),并启动定时器等待。
如果net_plugin插件或bnet_plugin插件收到网络上的区块,则需要同步。调用on_incoming_block()函数,该函数内部逻辑与生产区块逻辑类似,主要调用controller的abort_block(),start_block(),push_transaction(),finalize_block(),commit_block()。
注意,http和cleos提供了一些接口,其中包括push_transaction,push_block接口,其逻辑见上图,比较特殊,不做太多解释。
下图给出了区块同步的更详细的说明:

重点做以下解释:

  • fork_db:区块链,其中的区块尚未不可逆,当新块到达后,可能存在分叉。需要根据最长链原则,选出较长的一条链,较短的链被删除。
  • 每个新区块基于其上一个区块出块,根据该信息,可以对旧链fork_db的head block(HB)和新的区块(new HB)分别进行追溯,将旧链中的区块合并到新链上,然后删除旧链,保存新链。主要代码在fetch_branch_from()中实现,思想请参考:拉链法

5 net_plugin

net_plugin的主要功能如下:

  • 与其他节点建立连接;
  • 向网络中广播本节点区块;
  • 接收其他节点广播的区块;
  • 节点之间区块同步。

net_plugin的代码结构如下:

5.1 与其他节点建立连接

见上图:

  • net_plugin插件启动时,根据p2p-listen-endpoint成员变量(配置文件或命令行参数p2p-listen-endpoint参数(默认值0.0.0.0:9876)),调用tcp::acceptor的listen()启动监听,调用start_listen_loop()递归调用async_accept()异步等待网络连接请求(参考socket通信)。
  • 根据supplied_peers成员变量(配置文件或命令行参数p2p-peer-address(可多个)),逐一调用async_connect()进行连接请求。
  • 其他节点收到请求后回复,并调用start_session()建立会话。
  • 本节点收到回复得知成功后,亦调用start_session()建立会话。
  • start_session()包含两步:
  • 调用start_read_message()异步循环等待其他节点的消息;
  • 调用send_handshake()发送握手消息。
  • net_plugin插件中定义了9个消息类型,其中主要消息为:
  • handshake_message:握手消息
  • notice_message: 通知消息,主要在同步时使用,进行同步状态发送
  • sync_request_message:当本节点区块链的HB number小于对方节点LIB number时发送
  • request_message:当本节点区块链的HB number小于对方节点HB number时发送
  • signed_block_message:每个区块由该消息逐一发送

5.2 向网络中广播本节点区块

net_plugin插件在启动时,订阅了chain_plugin插件的accepted_block信号,该信号在区块被提交到本地待确认不可逆数据库(fork_db)中后发送。
收到该信号后,net_plugin插件向所有连接中的网络节点广播signed_block_message。

5.3 接收其他节点广播的区块

其他节点的net_plugin插件的start_read_message异步循环等待网络消息,收到其他节点signed_block_message后,会进行判断,如果本节点没有该区块且该区块合法,则保存到本地fork_db中,如果存在分叉,则按照最长链的原则,尝试进行合并,再启用该最长链。

5.4 节点之间区块同步

每个BP连续出块12个(12*0.5s=6s),每出一个块便立即广播,理想中各节点的区块链是实时同步的,然而由于网络原因,或者后加入的节点,其往往落后其他节点很多区块。因此涉及到大量区块的同步问题。
各节点每次建立连接时会发送 handshake_message ,该消息主要用于区块同步。每次握手进行一次区块同步状态判断,同步完成后会再次发送 handshake_message,循环进行判断。
同步状态有5中场景:

  • [State 0]双方HB id相同,id为数字摘要,如果相同说明HB完全一致,不需要同步,则Alice向Bob发送notice_message。
  • [State 1]如果Alice的区块链非常短,其HB number 竟然没有对方节点LIB number大,则Alice向Bob发送sync_request_message,附带参数(start,end)表示同步区间。其中start为Alice的LIB number,end为Bob的LIB number。
  • [State 2]与State 1相反,如果Alice发现Bob的HB number < Alice的LIB number,则发送notice_message让Bob主动来请求同步数据。
  • [State 3]如果Alice的HB number >Bob 的LIB number,但Alice的HB number < Bob的HB number,则也需要同步,Alice向Bob发送request_message,Bob收到消息后,从Bob的LIB开始,一直到Bob的HB,逐一发送区块。
  • [State 4]与State 3相反,则Alice发送notice_message让Bob主动来请求同步数据。
5.4.1 同步状态0

同步状态1见总图,由于较简单,不再赘述。

5.4.2 同步状态1


如上图所示:

  • Alice向Bob请求区块同步,总区间为[Alice LIB + 1, Bob LIB]。
  • 调用request_next_chunk()对总区间分组同步,默认大小为100个区块一组(配置文件或命令行参数sync-fetch-span)。
  • net_plugin::sync_manager子模块顺序选择一个网络节点,发送消息,并启动定时器_timer异步等待5秒。如果5秒后未收到对方的signed_block_message,则取消该请求(再次发送sync_request_message,但区间为[0,0]),并重新选择一个网络节点发送消息。
  • 如果5秒内收到对方节点的signed_block_message,取消_timer定时器,判断是否是自己想要的区块,保存后判断是否同步结束。
  • 如果尚未同步结束,重启_timer定时器等待;
  • 如果分组同步结束,再次调用request_next_chunk()请求下一个分组区块;
  • 如果总区间同步结束,向所有网络节点再次发送握手消息,继续进行同步状态判断。
  • 对方节点收到sync_request_message后,按请求区块区间,循环逐一发送区块。
5.4.3 同步状态2


如上图所示,Bob收到Alice的通知后,与Alice的同步状态1类型,调用相关函数进行同步。

5.4.4 同步状态3

如上图所示,Alice向Bob请求区块,Bob收到消息后,以区间[Bob LIB+1,BOB HB]循环逐一发送区块。
如果Bob的HB number 为0,则为异常,需要告知Alice。

5.4.5 同步状态4

如上图所示,该状态下,参考同步状态3。
另附消息发送函数enqueue()逻辑:

5.5 补充

net_plugin插件启动并建立连接后,会调用start_monitors(),一是监听网络连接状态,如果断开会重新尝试建立连接;二是监听交易时间,如果交易超时,则会将其移除入链队列。

6 controller

Controller模块位于/libraries/chain/下,是EOS区块入链的核心控制器,内容非常多,也非常重要。
Controller主要功能为:

  • 被chain_plugin初始化和启动
  • 对上提供区块和交易等相关接口
  • 对上提供区块和交易入链进度相关信号
  • 对下操作相关数据接口,进行数据管理

下图为Controller核心代码逻辑:

6.1 被chain_plugin初始化和启动

Controller模块由chain_plugin负责初始化和启动,chain_plugin根据启动参数启动Controller,从而对底层数据结构进行初始化操作。

6.2 对上提供区块和交易等相关接口

区块相关的接口有:

  • abort_block() :取消上一个正在生产的区块,其中的交易转到本次新区块中处理
  • start_block() :开始一个新区块生产,启动一个异步定时器等待交易被插入,定时器结束后开始打包等后续工作。此处还会进行BFT共识机制的处理。
  • finalize_block():本区块时间已到,打包区块
  • sign_block():对区块进行签名
  • commit_block():提交区块入fork_db,等待不可逆
  • push_block() : 主要用于初始化时或同步时插入区块,内部调用apply_block()函数以及上述几个函数,但没有异步定时器等代码。
  • pop_block():同步时fork_db发现分叉,需要对短链进行pop_block()
  • on_irreversible():区块不可逆时触发

交易相关的接口有:

  • push_transaction() : 插入交易
  • push_scheduled_transaction() : 插入延期的交易

其它接口:

  • set_proposed_producers() :更新BP列表

6.3 对上提供区块和交易入链进度相关信号

提供了一些进度信号,producer_plugin、mongodb_plugin、net_plugin等等会进行订阅,从而完成各自的功能。
相关信号有:

  • pre_accepted_block :调用push_block()(同步、刚启动时从数据库恢复),区块尚未add()到fork_db之前,先发送这个信号
  • accepted_block_header:调用push_block()或commit_block()(生产区块),区块被add()到fork_db后,发送该信号
  • accepted_block:调用commit_block(),区块被add()到fork_db后,发送该信号
  • irreversible_block:调用push_block()恢复数据时或on_irreversible()时,发送该信号
  • accepted_transaction:调用push_transaction()或push_scheduled_transaction()成功后,发送该信号
  • applied_transaction,同上

例如,mongodb_plugin收到accepted_block后会将数据写入mongodb。

6.4 对下操作相关数据接口,进行数据管理

数据结构包括如下:

  • pending :正在生产的区块,abort_block(),start_block()等均对它修改
  • unapplied_transactions:上一个区块未出块成功,则其交易在abort_block()时转存到这里,以便新的区块start_block()时重新push_transaction()
  • fork_db:区块生产完成后插入这里,可能存在分叉。当进行区块同步时,如果检测到分叉,则进行最长链的生成和选择。push_block()和commit_block()时调用add(),不可逆时或分叉时调用erase()
  • head : 指向fork_db的头块,用于生成下一个区块,以及进行快速比较等操作,fork_db变化则同时变化head
  • reversible_blocks:可逆的区块链。commit_block()时调用add(),on_ireeversible(),pop_block()时调用erase()
  • blog:不可逆的区块链,append only,on_irreversile()时调用append()
  • db : 不仅保存了区块,还保存了智能合约数据,账号数据等等其它数据。大部分数据修改的地方也会对其进行更新。

注意:

  • 区块广播时发送的是signed_block结构体,其继承signed_block_header-->block_header。
  • 各节点存储的数据还有更多信息,如DPOS+BFT相关的数据,存储在block_state-->block_header_state中。
  • signed_block中存储了所有交易的摘要:vector

7 eosio.system

主要实现了抵押RAM、申请BP、申请代理,投票、选举BP等功能。

转载自:https://my.oschina.net/u/4069047/blog/3005068

关于EOS出块BP备份的想法

假设使用一台机器作为BP节点机器。大家都知道eos的data目录超级大,备份的话只能全量备份。cp 的时间过于长,影响出块和RPC业务等提供。

暂时想到的方案如下

机器上跑2个nodeos进程,一个跑BP账号做出块节点,一个做同步几点。
保留三份data数据。出块节点初始化占一个,剩余两个,其中一个同步节点同步使用,同步节点定期切换剩余两个目录,保持2个目录数据与主网持平,当出块节点出问题导致data脏时,时切换到当时没有占用的data目录。然后删除脏目录,停止同步节点,并cp 同步节点前面用的data目录补齐第三目录。然后选一data重新运行同步节点。

且如果此机器提供RPC服务的话,可以做高可用。

优点

本方案可保证出块节点最大的运行时间。

缺点

data 三份的占用。

关于资源的抵押赎回,及查询抵押列表

关于抵押和赎回的方法,拿eosjs举例
https://eosio.github.io/eosjs/guides/2.-Transaction-Examples.html

Stake 抵押

const result = await api.transact({
  actions: [{
    account: 'eosio',
    name: 'delegatebw',
    authorization: [{
      actor: 'useraaaaaaaa',
      permission: 'active',
    }],
    data: {
      from: 'useraaaaaaaa',
      receiver: 'useraaaaaaaa',
      stake_net_quantity: '1.0000 SYS',
      stake_cpu_quantity: '1.0000 SYS',
      transfer: false,
    }
  }]
}, {
  blocksBehind: 3,
  expireSeconds: 30,
});

Unstake 赎回

const result = await api.transact({
  actions: [{
    account: 'eosio',
    name: 'undelegatebw',
    authorization: [{
      actor: 'useraaaaaaaa',
      permission: 'active',
    }],
    data: {
      from: 'useraaaaaaaa',
      receiver: 'useraaaaaaaa',
      unstake_net_quantity: '1.0000 SYS',
      unstake_cpu_quantity: '1.0000 SYS',
      transfer: false,
    }
  }]
}, {
  blocksBehind: 3,
  expireSeconds: 30,
});

操作细节看以上demo即可,下面我们说下抵押时transfer这个参数。
先看合约 (跳转代码)

if ( transfer ) {
    from = receiver;
}

如果设置transfer参数为true,及把来源的账户修改为接收账户。
也就变成了接收账户自己给自己抵押,由来源账户付钱

查看数据

查看代码
抵押数据表明为delband
(查看代码)

del_bandwidth_table     del_tbl( _self, from.value );

是以来源账户为查询的scope
所以RPC查询如下
post: https://api.eoslaomao.com/v1/chain/get_table_rows
data:

{
    "scope": "bcskillsurou",
    "code": "eosio",
    "table": "delband",
    "json": true
}

返回

{
    "rows": [
        {
            "from": "bcskillsurou",
            "to": "bcskillsurou",
            "net_weight": "0.0046 EOS",
            "cpu_weight": "0.0147 EOS"
        }
    ],
    "more": false
}