精通 Filecoin:Lotus真实数据处理之Provider初始化
摘要:精通Filecoin系列之StorageProvider 函数篇
StorageProvider 对象被存储矿工 API 对象所依赖,所以在启动存储矿工的过程中,DI 容器会调用 StorageProvider 函数(node/modules/storageminer.go)来创建它。StorageProvider 函数流程如下:
-
调用
NewFromLibp2pHost函数,生成StorageMarketNetwork对象。net := smnet.NewFromLibp2pHost(h) -
调用
NewLocalFileStore函数,生成FileStore存储对象。store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))NewLocalFileStore函数(go-fil-markets 类库 filestore/filestore.go)流程如下:base := filepath.Clean(string(basedirectory)) info, err := os.Stat(string(base))if !info.IsDir() { return nil, fmt.Errorf("%s is not a directory", base) }return &fileStore{string(base)}, nilNewLocalFileStore函数使用的路径为仓库目录。即碎片的临时目录就是仓库目录。 -
调用
CustomDealDecisionLogic函数,返回一个函数对象。在函数对象中调用我们提供的回调函数,进行自定义交易逻辑判断。opt := storageimpl.CustomDealDecisionLogic(func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {}) -
生成并返回
StorageProvider对象。p, err := storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), ibs, store, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, opt)return p, nilNewProvider函数处理如下:-
生成
PieceIOWithStore对象。carIO := cario.NewCarIO() pio := pieceio.NewPieceIOWithStore(carIO, fs, bs) -
生成
Provider对象。h := &Provider{ net: net, proofType: rt, spn: spn, fs: fs, pio: pio, pieceStore: pieceStore, conns: connmanager.NewConnManager(), storedAsk: storedAsk, actor: minerAddress, dataTransfer: dataTransfer, dealAcceptanceBuffer: DefaultDealAcceptanceBuffer, pubSub: pubsub.New(providerDispatcher), } -
生成 fsm 状态组对象。
deals, err := NewProviderStateMachine( ds, &providerDealEnvironment{h}, h.dispatch, )
fsm 状态组对象使用的配置参数如下:h.deals = dealsreturn fsm.New(ds, fsm.Parameters{ Environment: env, StateType: storagemarket.MinerDeal{}, StateKeyField: "State", Events: providerstates.ProviderEvents, StateEntryFuncs: providerstates.ProviderStateEntryFuncs, FinalityStates: providerstates.ProviderFinalityStates, Notifier: notifier, })-
环境对象为
providerDealEnvironment。 -
状态对象为
MinerDeal。 -
状态字段为
State。 -
事件集合为
ProviderEvents,参考 storagemarket/impl/providerstates/provider_fsm.go 文件。 -
状态处理函数集合 为
ProviderStateEntryFuncs,状态机的状态处理器根据对应的状态获取到指定的函数进行处理。 -
终止状态集合为
ProviderFinalityStates。 -
通知对象为
Provider对象的dispatch方法。
-
环境对象为
-
使用配置选项,配置
Provider对象。h.Configure(options...) -
设置数据传输监听对象。
当开始数据传输、传输结束、传输错误时会发送dataTransfer.SubscribeToEvents(dtutils.ProviderDataTransferSubscriber(deals))ProviderEventDataTransferInitiated、ProviderEventDataTransferCompleted、ProviderEventDataTransferFailed等事件到 fsm 状态组。 -
返回
Provider对象。
-
生成
在存储矿工启动过程自动调用 HandleDeals 函数(node/modules/storageminer.go)。在这个函数中,调用 StorageProvider 对象的 Start 方法,从而启动这个对象。
Start 方法执行过程如下:
-
调用
StorageMarketNetwork网络对象的SetDelegate设置代理/委托为自身。
网络对象的实现为err := p.net.SetDelegate(p)libp2pStorageMarketNetwork结构体(storagemarket/network/libp2p_impl.go)。它的SetDelegate方法内容如下:
上面分别设置网络对象的impl.receiver = r impl.host.SetStreamHandler(storagemarket.DealProtocolID, impl.handleNewDealStream) impl.host.SetStreamHandler(storagemarket.AskProtocolID, impl.handleNewAskStream) return nilhandleNewDealStream方法处理DealProtocolID协议,表示存储;handleNewAskStream方法 处理AskProtocolID协议,表示 ask。handleNewDealStream方法内容如下:// 客户端 peer id remotePID := s.Conn().RemotePeer()buffered := bufio.NewReaderSize(s, 16)// 对流进行包装 ds := &dealStream{remotePID, impl.host, s, buffered}// 调用StorageProvider对象的HandleDealStream方法,处理客户端存储请求 impl.receiver.HandleDealStream(ds) -
在协程中调用
StorageProvider对象的restartDeals方法,重新进行交易处理。restartDeals方法流程如下:-
从 fsm 状态组对象中获取所有的交易对象。
var deals []storagemarket.MinerDeal err := c.deals.List(&deals) -
遍历所有的交易对象,进行下面的处理:
- 如果当前交易对象已经终止,则进行下一个处理。
- 如果当前交易对象的连接已经关闭,则进行下一个处理。
-
发送初始交易事件给 fsm 状态组。
交易提案的 Cid 表示了状态机的名称/编号。err = c.deals.Send(deal.ProposalCid, storagemarket.ProviderEventRestart)
-
从 fsm 状态组对象中获取所有的交易对象。
- 返回空值。
来源链接:https://www.8btc.com/article/630375
来源:乔疯
- 免责声明
- 世链财经作为开放的信息发布平台,所有资讯仅代表作者个人观点,与世链财经无关。如文章、图片、音频或视频出现侵权、违规及其他不当言论,请提供相关材料,发送到:2785592653@qq.com。
- 风险提示:本站所提供的资讯不代表任何投资暗示。投资有风险,入市须谨慎。
- 世链粉丝群:提供最新热点新闻,空投糖果、红包等福利,微信:juu3644。

挖矿淘金



