Giter Site home page Giter Site logo

go-graphsync's Introduction

go-graphsync

Coverage Status Build Status

An implementation of the graphsync protocol in go!

Table of Contents

Background

GraphSync is a protocol for synchronizing IPLD graphs among peers. It allows a host to make a single request to a remote peer for all of the results of traversing an IPLD selector on the remote peer's local IPLD graph.

go-graphsync provides an implementation of the Graphsync protocol in go.

Go-IPLD-Prime

go-graphsync relies on go-ipld-prime to traverse IPLD Selectors in an IPLD graph. go-ipld-prime implements the IPLD specification in go and is an alternative to older implementations such as go-ipld-format and go-ipld-cbor. In order to use go-graphsync, some understanding and use of go-ipld-prime concepts is necessary.

If your existing library (i.e. go-ipfs or go-filecoin) uses these other older libraries, you can largely use go-graphsync without switching to go-ipld-prime across your codebase, but it will require some translations

Install

go-graphsync requires Go >= 1.13 and can be installed using Go modules

Usage

Initializing a GraphSync Exchange

import (
  graphsync "github.com/ipfs/go-graphsync/impl"
  gsnet "github.com/ipfs/go-graphsync/network"
  ipld "github.com/ipld/go-ipld-prime"
)

var ctx context.Context
var host libp2p.Host
var lsys ipld.LinkSystem

network := gsnet.NewFromLibp2pHost(host)
exchange := graphsync.New(ctx, network, lsys)

Parameter Notes:

  1. context is just the parent context for all of GraphSync
  2. network is a network abstraction provided to Graphsync on top of libp2p. This allows graphsync to be tested without the actual network
  3. lsys is an go-ipld-prime LinkSystem, which provides mechanisms loading and constructing go-ipld-prime nodes from a link, and saving ipld prime nodes to serialized data

Using GraphSync With An IPFS BlockStore

GraphSync provides a convenience function in the storeutil package for integrating with BlockStore's from IPFS.

import (
  graphsync "github.com/ipfs/go-graphsync/impl"
  gsnet "github.com/ipfs/go-graphsync/network"
  storeutil "github.com/ipfs/go-graphsync/storeutil"
  ipld "github.com/ipld/go-ipld-prime"
  blockstore "github.com/ipfs/go-ipfs-blockstore"
)

var ctx context.Context
var host libp2p.Host
var bs blockstore.Blockstore

network := gsnet.NewFromLibp2pHost(host)
lsys := storeutil.LinkSystemForBlockstore(bs)

exchange := graphsync.New(ctx, network, lsys)

Calling Graphsync

var exchange graphsync.GraphSync
var ctx context.Context
var p peer.ID
var selector ipld.Node
var rootLink ipld.Link

var responseProgress <-chan graphsync.ResponseProgress
var errors <-chan error

responseProgress, errors = exchange.Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node)

Paramater Notes:

  1. ctx is the context for this request. To cancel an in progress request, cancel the context.
  2. p is the peer you will send this request to
  3. link is an IPLD Link, i.e. a CID (cidLink.Link{Cid})
  4. selector is an IPLD selector node. Recommend using selector builders from go-ipld-prime to construct these

Response Type

type ResponseProgress struct {
  Node      ipld.Node // a node which matched the graphsync query
  Path      ipld.Path // the path of that node relative to the traversal start
	LastBlock struct {  // LastBlock stores the Path and Link of the last block edge we had to load. 
		ipld.Path
		ipld.Link
	}
}

The above provides both immediate and relevant metadata for matching nodes in a traversal, and is very similar to the information provided by a local IPLD selector traversal in go-ipld-prime

Contribute

PRs are welcome!

Before doing anything heavy, checkout the Graphsync Architecture

See our Contributing Guidelines for more info.

License

This library is dual-licensed under Apache 2.0 and MIT terms.

Copyright 2019. Protocol Labs, Inc.

go-graphsync's People

Contributors

aarshkshah1992 avatar acruikshank avatar alexey-n-chernyshov avatar dependabot[bot] avatar dirkmc avatar djdv avatar galargh avatar gammazero avatar hacdias avatar hannahhoward avatar hinshun avatar hsanjuan avatar ipfs-mgmt-read-write[bot] avatar johnnymatthews avatar lanzafame avatar liamsi avatar marten-seemann avatar masih avatar michaelavila avatar mvdan avatar rvagg avatar stebalien avatar warpfork avatar web-flow avatar web3-bot avatar whyrusleeping avatar willscott avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-graphsync's Issues

[flake] TestFailedRequest fails

  === RUN   TestFailedRequest
      channelassertions.go:41: 
          	Error Trace:	channelassertions.go:41
          	            				channelassertions.go:13
          	            				testutil.go:168
          	            				requestmanager_test.go:267
          	Error:      	Not equal: 
          	            	expected: 0
          	            	actual  : 1
          	Test:       	TestFailedRequest
          	Messages:   	should receive an error
  --- FAIL: TestFailedRequest (1.01s)

Graphsync does not return an error when the request context is cancelled

We found a problem in go-data-transfer whereby when the node shuts down

  • the context for all the outstanding requests are cancelled
  • go-data-transfer's graphsync transport consumes all outstanding requests
  • the graphsync request completes without returning an error
  • the transfer is marked as complete, even though it is incomplete

We worked around this problem by checking the if the request context was closed when the request finishes: filecoin-project/go-data-transfer#134. The correct solution would be to fix the underlying problem, which appears to be that graphsync does not return an error when the request context is cancelled.

graphsync client should handle missing blocks on responder side

Many legit use cases rely on sparse DAGs, understood as DAGs containing blocks not present in the responder's blockstore. Examples include federated/decentralised DAGs, blockchains nodes with limited history lookbacks, DAGs spanning many Filecoin deals, etc.

Right now, go-graphsync FUBARs with: Remote Peer Is Missing Block when the responder indicates that a block is missing, which makes use cases like that impossible.

@hannahhoward indicates:

Graphsync returns error channel that can send multiple errors. When you get a missing block, it sends an error on the error channel but the traversal continues. The assumption was the consuming code would be reading the error channel and would decide whether to terminate or continue.
So yes thereโ€™s a way.
I think the error should be singular, and we should put the missing block as extra information in the response channel.
Iโ€™m definitely open to that redesign โ€” itโ€™s a breaking change of course.

[flake] TestRejectRequestsByDefault

 === RUN   TestRejectRequestsByDefault
  got receive err
      testutil.go:171: 
          	Error Trace:	testutil.go:171
          	            				graphsync_test.go:172
          	Error:      	Should be false
          	Test:       	TestRejectRequestsByDefault
          	Messages:   	shouldn't have sent second error but did
  --- FAIL: TestRejectRequestsByDefault (0.01s)

[flake] TestNetworkErrors/network_error_block_send

  === RUN   TestNetworkErrors/network_error_block_send
      channelassertions.go:72: 
          	Error Trace:	channelassertions.go:72
          	            				channelassertions.go:48
          	            				responsemanager_test.go:1037
          	            				responsemanager_test.go:629
          	Error:      	Not equal: 
          	            	expected: 0
          	            	actual  : 1
          	Test:       	TestNetworkErrors/network_error_block_send
          	Messages:   	should clear request

Chore: Mock go-ipld-prime Interfaces

Understanding that go-ipld-prime will take a long time to write, I am going to "best-guess" what its interfaces will look like, and develop against that. If this presents issues for the IPLD team, I'll adjust my code, but in the meantime I'll produce a mock interface I can test against!

3-dimensional memory/throughput benchmarks of go-graphsync

Goal: characterise memory load and throughput of graphsync+stores in isolation with everything else.

Assuming a perfect network, benchmark three dimensions:

Store dimension:

  • badger2
  • memory (control)
  • filesystem (low prio)
  • badger (low prio)

Payload size dimension:

  • 1MiB
  • 10MiB
  • 100MiB
  • 1GiB (โฌ‡๏ธ activated with a flag)
  • 32GiB
  • 64GiB

Concurrency dimension:

  • 2^(0-8)

Use the custom metrics go benchmark API to record useful metrics.

RFC: Accounting for CPU/Memory cost on responder

The Graphsync spec states:


Other notes

Cost to the responder. The graphsync protocol will require a non-zero additional overhead of CPU and memory. This cost must be very clearly articulated, and accounted for, otherwise we will end up opening ugly DoS vectors


This is true. If responders simply accept every GraphSync query sent to them a DoS attack becomes trivial. I am wondering if and how we need to account for this before GraphSync is integrated into go-filecoin or any production system.

Does this library make sense?

When I joined the Graphsync team I feel like I did not understand a lot about the problem weโ€™re trying to solve. I want to write down what I think I know understand to see if it is correct.

As I now understand it, the IPLD team is working on a library to handle linked data that could be in any format, traversed in arbitrary ways with links between data sometimes crossing block boundaries and sometimes not, assuming the data even uses blocks as the underlying storage. In other words, itโ€™s an incredibly generalized concept.

Filecoin has a very specific use case: it needs to sync Blockchains quickly without multiple network round trips. And a blockchain is possibly the simplest possible concrete form of linked data, where each node is one block with one link to its parent.

I think I now understand why the Graphsync wire protocol weโ€™re building isnโ€™t the ideal IPLD replication protocol, because it primarily deals with blocks instead of nodes. (aside from separate discussions about whether itโ€™s too complicated to support multiple requests and responses).

To implement the current protocol, this libraryโ€™s interactions with the selector traversal functions are going to be somewhat upside down. Because the protocol is dealing mostly in blocks, most of the work in this library is going to be accomplished by overriding the LinkLoader function (or more specifically the underlying RawLoader function) used by IPLD:

  1. In the responder, when IPLDโ€™s selector traversal functions request a block, I will load it from a local datastore (using some kind of function supplied to me by filecoin I assume) , and then Iโ€™m going to also write them out to the network to transmit to the requestor.

  2. In the requestor, Iโ€™m going to supply a loader function that load blocks from the network rather than a local data store, as they come in, essentially supplying an IOReader that blocks on first Read till I get the response from the network.

Itโ€™s not entirely clear how I will use the Visit functions, except to transmit data back to Filecoin in the requestor, assuming it wants nodes and not blocks.

Anyway, I notice that:

  1. Filecoin has a immediate specific need

  2. The IPLD team is working on a somewhat related problem which is much more complex and generalized.

  3. Trying to use IPLDโ€™s work to solve Filecoinโ€™s need by implementing a somewhat generalized but still less than ideal replication protocol called Graphsync, which itself will require a lot of shoe-horning, may not be the most expedient or clean way to get to solving Filecoinโ€™s need.

So basically, I just want to make sure the current course makes sense and we want to continue on it.

I feel capable of writing this Graphsync library, I just think itโ€™s gonna end up looking pretty weird.

[flake] TestValidationAndExtensions/on_its_own,_should_fail_validation

  === RUN   TestValidationAndExtensions/on_its_own,_should_fail_validation
      channelassertions.go:41: 
          	Error Trace:	channelassertions.go:41
          	            				channelassertions.go:13
          	            				responsemanager_test.go:1103
          	            				responsemanager_test.go:176
          	Error:      	Not equal: 
          	            	expected: 0
          	            	actual  : 1
          	Test:       	TestValidationAndExtensions/on_its_own,_should_fail_validation
          	Messages:   	should receive status

Blocks can be zero sized

Currently, if we send a zero-size block, we won't process block hooks, etc. This case may actually be fine, but we can't assume that a zero block size implies no block.

DoNotSend extension should NOT send ALL cids receieved

To support transfer resumption, a Graphsync requestor sends a DoNotSendCids message to the responder with ALL the CIDs received so far. This solution does NOT scale as sending ALL cids recieved for large data transfers causes the message size to exceed the message size limits set by the Graphsync protocol.

We need to redesign the DoNotSendCIds extension.

Add extension for 'do not explore cids'

Currently, graphsync supports an extension for doNotSendCIDs, which provides a list of CIDs that should not be sent during a transfer.

It may be the case that the requester wants to receive a dag, but wants to stop graph retrieval whenever a CID is reached. For instance, consider the use case of synchronizing a chain, but only back to the point of previous synchronization.

This could either be done as an extension to graphsync, as proposed here, or incorporated into the IPLD selector syntax.

Adding an extension here may remain useful as a way to more efficiently resume transfers with the ability to collapse the described 'cids i already have' to just roots of subtrees that have been received previously. The graphsync extension path also is likely a faster path for prototyping this behavior than the selector alternative.

block sent listener is not called for every block sent

I added TestDataSentEvents as an integration test in go-data-transfer to watch for events emitted. It seems that gsBlockSentHook is not called every time that a block is sent (whereas gsIncomingBlockHook is called every time a block is received). gsBlockSentHook is registered with graphsync using RegisterBlockSentListener.

The full output from the test is below. Note the output for the last two lines:

prov [Completed] CleanupComplete: qd 0, rcvd 20439, sent 0, total: 0
clnt [Completed] CleanupComplete: qd 20439, rcvd 0, sent 14295, total: 0

On the provider graphsync emitted "receive" events for 20439 bytes.
On the client graphsync emitted "queue" events for 20439 bytes, but only emitted "sent" events for 14295 bytes.

Full output:

clnt [Requested] Open: qd 0, rcvd 0, sent 0, total: 0
prov [Requested] Open: qd 0, rcvd 0, sent 0, total: 0
prov [Ongoing] Accept: qd 0, rcvd 0, sent 0, total: 0
clnt [Ongoing] Accept: qd 0, rcvd 0, sent 0, total: 0
clnt [Ongoing] ResumeResponder: qd 0, rcvd 0, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 966, rcvd 0, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 1990, rcvd 0, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 3014, rcvd 0, sent 0, total: 0
clnt [Ongoing] DataSent: qd 3014, rcvd 0, sent 966, total: 0
clnt [Ongoing] DataQueued: qd 4038, rcvd 0, sent 966, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 966, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 5062, rcvd 0, sent 966, total: 0
clnt [Ongoing] DataSent: qd 5062, rcvd 0, sent 1990, total: 0
clnt [Ongoing] DataQueued: qd 6086, rcvd 0, sent 1990, total: 0
clnt [Ongoing] DataQueued: qd 7110, rcvd 0, sent 1990, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 1990, sent 0, total: 0
clnt [Ongoing] DataSent: qd 7110, rcvd 0, sent 3014, total: 0
clnt [Ongoing] DataQueued: qd 8134, rcvd 0, sent 3014, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 3014, sent 0, total: 0
clnt [Ongoing] DataSent: qd 8134, rcvd 0, sent 4038, total: 0
clnt [Ongoing] DataQueued: qd 9158, rcvd 0, sent 4038, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 4038, sent 0, total: 0
clnt [Ongoing] DataSent: qd 9158, rcvd 0, sent 5062, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 5062, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 10182, rcvd 0, sent 5062, total: 0
clnt [Ongoing] DataSent: qd 10182, rcvd 0, sent 6086, total: 0
clnt [Ongoing] DataQueued: qd 11206, rcvd 0, sent 6086, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 6086, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 12230, rcvd 0, sent 6086, total: 0
clnt [Ongoing] DataSent: qd 12230, rcvd 0, sent 7110, total: 0
clnt [Ongoing] DataQueued: qd 13254, rcvd 0, sent 7110, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 7110, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 14278, rcvd 0, sent 7110, total: 0
clnt [Ongoing] DataSent: qd 14278, rcvd 0, sent 8134, total: 0
clnt [Ongoing] DataQueued: qd 15302, rcvd 0, sent 8134, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 8134, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 16326, rcvd 0, sent 8134, total: 0
clnt [Ongoing] DataSent: qd 16326, rcvd 0, sent 9158, total: 0
clnt [Ongoing] DataQueued: qd 17350, rcvd 0, sent 9158, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 9158, sent 0, total: 0
clnt [Ongoing] DataSent: qd 17350, rcvd 0, sent 10182, total: 0
clnt [Ongoing] DataQueued: qd 18374, rcvd 0, sent 10182, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 10182, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 19398, rcvd 0, sent 10182, total: 0
clnt [Ongoing] DataSent: qd 19398, rcvd 0, sent 11206, total: 0
clnt [Ongoing] DataQueued: qd 20422, rcvd 0, sent 11206, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 11206, sent 0, total: 0
clnt [Ongoing] DataSent: qd 20422, rcvd 0, sent 12230, total: 0
clnt [Ongoing] DataQueued: qd 20439, rcvd 0, sent 12230, total: 0
clnt [Ongoing] DataSent: qd 20439, rcvd 0, sent 13254, total: 0
clnt [Ongoing] DataSent: qd 20439, rcvd 0, sent 14278, total: 0
clnt [Ongoing] DataSent: qd 20439, rcvd 0, sent 14295, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 12230, sent 0, total: 0
clnt [TransferFinished] FinishTransfer: qd 20439, rcvd 0, sent 14295, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 13254, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 14278, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 15302, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 16326, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 17350, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 18374, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 19398, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 20422, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 20439, sent 0, total: 0
clnt [Completing] ResponderCompletes: qd 20439, rcvd 0, sent 14295, total: 0
prov [Completing] Complete: qd 0, rcvd 20439, sent 0, total: 0
prov [Completed] CleanupComplete: qd 0, rcvd 20439, sent 0, total: 0
clnt [Completed] CleanupComplete: qd 20439, rcvd 0, sent 14295, total: 0

    integration_test.go:342: 
        	Error Trace:	integration_test.go:342
        	Error:      	Not equal: 
        	            	expected: []uint64{0x3c6, 0x7c6, 0xbc6, 0xfc6, 0x13c6, 0x17c6, 0x1bc6, 0x1fc6, 0x23c6, 0x27c6, 0x2bc6, 0x2fc6, 0x33c6, 0x37c6, 0x3bc6, 0x3fc6, 0x43c6, 0x47c6, 0x4bc6, 0x4fc6, 0x4fd7}
        	            	actual  : []uint64{0xbc6, 0x13c6, 0x1bc6, 0x1fc6, 0x23c6, 0x27c6, 0x2fc6, 0x37c6, 0x3fc6, 0x43c6, 0x4bc6, 0x4fc6, 0x4fd7, 0x4fd7, 0x4fd7}
        	            	
        	            	Diff:
        	            	--- Expected
        	            	+++ Actual
        	            	@@ -1,8 +1,4 @@
        	            	-([]uint64) (len=21) {
        	            	- (uint64) 966,
        	            	- (uint64) 1990,
        	            	+([]uint64) (len=15) {
        	            	  (uint64) 3014,
        	            	- (uint64) 4038,
        	            	  (uint64) 5062,
        	            	- (uint64) 6086,
        	            	  (uint64) 7110,
        	            	@@ -11,12 +7,10 @@
        	            	  (uint64) 10182,
        	            	- (uint64) 11206,
        	            	  (uint64) 12230,
        	            	- (uint64) 13254,
        	            	  (uint64) 14278,
        	            	- (uint64) 15302,
        	            	  (uint64) 16326,
        	            	  (uint64) 17350,
        	            	- (uint64) 18374,
        	            	  (uint64) 19398,
        	            	  (uint64) 20422,
        	            	+ (uint64) 20439,
        	            	+ (uint64) 20439,
        	            	  (uint64) 20439
        	Test:       	TestDataSentEvents
--- FAIL: TestDataSentEvents (0.01s)
FAIL

Debugger finished with exit code 0

Network errors not propagating up to RegisterNetworkErrorListener

It seems that a listener registered with RegisterNetworkErrorListener() will not receive errors emitted by the message queue.

For example the error events published on lines 178 and 187 of messagequeue.go:

func (mq *MessageQueue) sendMessage() {
message, topic := mq.extractOutgoingMessage()
if message == nil || message.Empty() {
return
}
mq.eventPublisher.Publish(topic, Event{Name: Queued, Err: nil})
defer mq.eventPublisher.Close(topic)
err := mq.initializeSender()
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
mq.eventPublisher.Publish(topic, Event{Name: Error, Err: fmt.Errorf("cant open message sender to peer %s: %w", mq.p, err)})
return
}
for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
if mq.attemptSendAndRecovery(message, topic) {
return
}
}
mq.eventPublisher.Publish(topic, Event{Name: Error, Err: fmt.Errorf("expended retries on SendMsg(%s)", mq.p)})
}

Internal Graphsync Benchmarking Plan

This issue documents our initial goals for profiling go-graphsync performance with TestGround (https://github.com/testground/testground)

Goals

Assess Graphsync performance in a variety of scenarios ON ITS OWN. We want to identify performance issues in the Graphsync library itself, including memory leaks, performance bottlenecks, etc, when it's transferring various types of IPLD data under various network conditions.

In scope

We are assessing graphsync's performance with a minimal number of external dependencies: a libp2p stack, and a data store.

Out of scope

We are not attempting to test graphsync's performance with various other components in the IPFS or Filecoin stack (i.e. Bitswap, DHT, unusual blockstore setup situations, etc)

Prior art

Tasks

  • Move/Copy filecoin testground test to this repo
  • Add parameterization around unix FS file creation -- see what we used in
    func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int, useRawNodes bool) distFunc {
    - specifically include raw vs non-raw nodes, configurable chunk size
  • Add option for disk based datastore (memory performance tracking is meaningless otherwise) - we utilize temp folders
    func newTempDirMaker(b *testing.B) (*tempDirMaker, error) {
    + badger -
    defopts := badgerds.DefaultOptions
    - in a dockerized environment we can probably just write someone on the running disk and don't need to worry about temp files as the volumes get reset when shutdown
  • We can use testground heap profiling, but we may want to actually trigger heap profiles at specific points. Not sure if testground has a facility for this? We can certainly dump a profile with go (https://golang.org/pkg/runtime/pprof/#WriteHeapProfile) but I'm not sure if TestGround has a facility for moving it out of the docker container and onto the main disk? cc: @nonsense
  • We need to heap profile on the responder side as well as the requestor -- this may entail keeping the responder alive -- my read of the current code for filecoins testplan is that the routine on the responder side ends early
  • We also probably want to try this dump with and without an explicit GC before hand (i.e. runtime.GC())
  • For accurate memory statistics, we should modify the response consumption on the requestor to consume the response channel as well as the error channel https://github.com/filecoin-project/lotus/blob/master/testplans/graphsync/main.go#L208 (https://github.com/ipfs/go-graphsync/blob/master/benchmarks/benchmark_test.go#L171 for example of correctly consuming response channel)
  • We may want to add testing of disconnects of various kinds
  • We need to integrate with testground as a service in our CI

This issue is an epic tracker. We can submit this over several PRs.

[flake] TestEncodingExtensions/responding_to_extensions

  === RUN   TestEncodingExtensions/responding_to_extensions
      channelassertions.go:41: 
          	Error Trace:	channelassertions.go:41
          	            				channelassertions.go:13
          	            				testutil.go:168
          	            				requestmanager_test.go:464
          	Error:      	Not equal: 
          	            	expected: 0
          	            	actual  : 1
          	Test:       	TestEncodingExtensions/responding_to_extensions
          	Messages:   	should receive an error

(on Windows)

Requestor pausing probably needs a rearchitect, or should be removed entirely

This module supports pausing and resuming in a number of ways:

  • A responder can pause by calling PauseResponse in an OutgoingBlockHook
  • A responder can pause by calling PauseResponse directly on the Graphsync instance
  • A responder can resume by calling UnpauseResponse in a RequestUpdatedHook
  • A responder can resume by calling UnpauseResponse directly on the Graphsync Instance
  • A requestor can pause by calling PauseRequest on the Graphsync instance
  • A requestor can unpause by calling UnpauseRequest on the Graphsync instance

Of these, the requestor pause/unpause is by far the most complicated to implement, and produces unpredictable behavior.

Graphsync is designed to operate in an untrusted environment, and as such, responders can't simply accept commands from requestors to pause at any time (I could DDOS a respondering by simply telling them to respond to requests I kept pausing till they held too much memory for all my requests)

I explored a number of ways to implement this, and eventually settled on a requestor dealing with pause/unpause by simply cancelling the request and sending it again with a do-no-send-cids extension.

There are a number of problems with this:

  1. It adds significant complexity to the requestor implementation, as we have to track all CIDS sent.
  2. It leads to unpredictable behavior -- as soon as we pause on the requestor side, we stop recording the CIDs we receive. However, the responder may send more CIDs before it gets the cancel message. This means upon unpause, those CIDs are sent a second time. This leads to #158.
  3. The PauseRequest/UnpauseRequest do something very different at the protocol level than what they're named. Where the pausing on the responder has a protocol level responder code, pausing in the requestor is not a concept covered by the protocol. The methods implement a Cancel/Restart more than a pause/unpause (but still with the same request ID -- so the calling module doesn't know that we've actually done this inside the module -- oy).
  4. This was all done before go-data-transfer had implemented restarts. Since go-data-transfer implements restarts by tracking CIDs and using do-not-send-cids, this means we're tracking CIDs twice (go-graphsync doesn't track CIDs to disk, and it should combine it's internal set with an external set passed by go-data-transfer correctly, but it's still kind of bizarre behavior)

Pause/Unpause is part of the protocol on the responder side. There is a response code that indicates a response has been paused, and a mechanism for the client to ask the responder to unpause. It makes sense to support pause/unpause on the responder side.

However, I think that pause/unpause for the requestor should not be part of go-graphsync. We should enable primitives to do do this via higher level code:

  • Cancelling requests (already supported by cancelling the context on the call to Request)
  • Sending arbitrary extensions in request updates - i.e.SendExtensionData(RequestID, ...ExtensionData)
  • Possibly add the ability to pause a response as well as unpause a response in a RequestUpdatedHook

This enables a few ways you might implement in go-data-transfer:

  • Implement Pause/Unpause for the requesting side via sending an extension through graphsync and reading it in a request updated hook and pausing there... meaning ultimately the responder pauses/unpauses
  • Implement Pause/Unpause via simply communicating on data transfer protocol and then pausing by calling PauseResponse on the graphsync instance directly.

Note: this implementations may still require a fair amount of complexity, as any pause initiated on the requesting side must account for the responding side sending more data before it receives the pause request.

Alternatively, we can try to develop a pause /unpause request at the protocol level in go-graphsync so that we can at least more clearly define expected behavior.

Implementing Data Deduplication Across Simultaneous Queries

As described in ipld/specs#120 it would be great if we could hold off GCing the list of blocks we've sent to a peer until the end of multi-part request. This currently might happen by accident, since as long as one of the queries that accesses a duplicate node hasn't completed yet the reference won't be freed.

Each of the potential approaches mentioned in the issue above has its advantages and disadvantages. However, none of them seem like a particularly huge deal to implement since the protocol is already flexible enough to handle most of it.

@hannahhoward I'm happy to help with the implementation work here as well.

[flake] TestLocallyFulfilledFirstRequestSucceedsLater

  === RUN   TestLocallyFulfilledFirstRequestSucceedsLater
      asyncloader.go:90: 
          	Error Trace:	asyncloader.go:90
          	            				requestmanager_test.go:325
          	Error:      	Should not be: 0
          	Test:       	TestLocallyFulfilledFirstRequestSucceedsLater
          	Messages:   	did not clean up request properly
  --- FAIL: TestLocallyFulfilledFirstRequestSucceedsLater (0.01s)

Better errors when rejecting requests

Ideally, we'd return an error indicating why. For example, we forbid infinitely recursive queries. Ideally, we'd report this back to the caller given that the client hasn't actually done anything wrong.

Interface to register listeners for network errors on receiving side

Currently there is a RegisterNetworkErrorListener to listen for errors that occur when sending a request.
It would be useful to also be able to listen for errors that occur while processing a stream on the receiving side. Suggested name: RegisterReceiverNetworkErrorListener

Motivation

When a client wants to send data to a provider

  1. go-data-transfer client opens a "push" channel to the provider
  2. go-data-transfer provider opens a go-graphsync query to the client for the file
  3. go-graphsync responds with the file data

If there is a network error (eg if the provider is restarted), the provider should wait for the client to restart the "push" channel.
The client can begin attempting to restart the channel as soon as graphsync reports a network error.

Needed for Miner should not try to dial the client to restart a data transfer

Implementation proposal

Currently when there is a network error on the receiving side, the networking layer calls ReceiveError(err). ReceiveError could use a pubsub mechanism to call the listeners, or the networking layer could do that directly.

Ideally the listener would take two parameters: func(peer.ID, error)

GraphSync user can receive a response from another node

As a user of the GraphSync client library, I want to be able to receive a response to a graphsync query that I sent to another node, so that I can see results of a graphsync query.

  • Setup ReceiveMessage method on GraphSync library so GraphSync works as a Receiver for the GraphSyncNetwork
  • Setup ProcessResponse method on RequestManager so incoming responses are passed into the original requestor of the graph sync response
  • Setup ReceiveMessage to call ProcessResponse on the RequestManager and ProcessRequest on the ReponseManager

--

Acceptance criteria:

  • Setup two nodes with MockNet
  • Setup GraphSync on both nodes
  • Send a query from one node to the other
  • Setup mock goselector to return a result for that query
  • Verify that a result is received back on the request to that matches the result of the query

Add CancelRequest method to API

Currently the only way to cancel a graphsync request is by cancelling the context passed to Request().

It's not possible to determine when all events for the request have been processed. So the calling code may take actions that assume the request has completed when in fact some events are still being processed. This can cause races, for example filecoin-project/lotus#6968 (comment)

Proposal: Add a method to the graphsync API that cancels the request and waits for all event queues for the request to be drained:
CancelRequest(graphsync.RequestID)

On resume graphsync occasionally enqueues the same outgoing block twice for a single file

While running go-data-transfer tests I've noticed that graphsync seems to occasionally enqueue the same outgoing block twice for the same file. This appears to happen when the transfer is paused and then resumed.

This causes an issue with go-data-transfer because the provider calculates the queued bytes by summing the size of all queued blocks. However when a duplicate block is queued, the block doesn't actually get sent across the wire. So the provider may expect payment for a block that hasn't been sent.

The provider uses RegisterOutgoingBlockHook() to watch for enqueued blocks.

I've created a test in go-data-transfer to demonstrate the issue:
filecoin-project/go-data-transfer#152

In this case the transfer is paused after the second block is sent. When the transfer is resumed the second block is sent again. See the example below:

$ go test ./impl -run TestGSDoubleSend -count=10 -v
=== RUN   TestGSDoubleSend
=== RUN   TestGSDoubleSend/fast_unseal,_payment_channel_ready
0: QmcJoNHgRhpowes6X3YHxxhhi9Vu1MqYfLKoDGtCWy7eae
1: bafkreiegr5x3na4gsjokgc3oyp6evqr4brnvuuamqv7qf5j55f46afbwa4
2: bafkreifaglzaff5gxfehb7o37c3gpq2yzvcvzi7to4lam6h726p2w5u46y
3: bafkreiblnz4nfgic5qyh7l5tgfx2676fk7ve6vkbhjfoicsqsjo7jbdt5m
4: bafkreicbrli2logxql3h36sgh6wfqouxibsiuzymnwvi2cknxdoc7wtknu
5: bafkreigzpidqubh2afgmapairjdtwhcmh332klpaouasyuurdhwu2txxju
6: bafkreibqsy6pn3y5oy6m2dnhacbimqfx4l6352gz27kikbvtf6kyvjc7ly
7: bafkreifda52jtpkh2bavgdnlit5yeoa3xlj42i5evjgsgefyh6r3mull7m
8: bafkreialq7brtimc3zyrgg753ndukywv5qphu2lkyv63twp5bwncq77z74
9: bafkreig6sewnzawa2yhqfpf54dex7f3wo2m6royoldvwiydrcm2qot233i
10: bafkreihujcyc5tqemqys7wxrgo7n6bsfkswfc6yxw4x6npgb3kxmezb4pe
11: bafkreiahyrgpq52w6ijimwoecbrpn7egx6fxn3wavevb47r7cgayadx7sq
12: bafkreibogblledlbehxp3z25sl7ixlweqg5cirawe6ah5htfj6exiv4rhm
13: bafkreib5kvazffowsdcyqtzifpyq4jaztatodlxmsl7si5622ad2hm4isi
14: bafkreib6bvpds3jyylmfe6b77qd73llqs53enwmjlcedwtpjpr24h4i4qm
15: bafkreififpy27chexnldi2xlpn2ciygbt6oervjuqrvoxsowjfrwsaqxuq
16: bafkreiaqymh43iwxge5lcnv65kj5oxczspwxmp5bude7qqhletvfuufgdu
17: bafkreibuygwyfnws63k6cc4krmi7lhn5zzpqqarx3bj65cruubmwl6p2r4
18: bafkreigblr6bvqcppjsdy4qcuer7o67ol57b7jec642pxgljuemfqzmd64
19: bafkreia3lhfluhxwhnfjmfzvsmjyxdhjb466pep3njg6zcoy57tavgtjvi
20: bafkreign7cs2fgqrufywaniauz43ktvxjgi6t66curvhsfzcguqn2gapoe
  provider completed
  client completed
--- PASS: TestGSDoubleSend (0.02s)
    --- PASS: TestGSDoubleSend/fast_unseal,_payment_channel_ready (0.02s)

=== RUN   TestGSDoubleSend
=== RUN   TestGSDoubleSend/fast_unseal,_payment_channel_ready
0: QmcJoNHgRhpowes6X3YHxxhhi9Vu1MqYfLKoDGtCWy7eae
1: bafkreiegr5x3na4gsjokgc3oyp6evqr4brnvuuamqv7qf5j55f46afbwa4
2: bafkreiegr5x3na4gsjokgc3oyp6evqr4brnvuuamqv7qf5j55f46afbwa4
    Already saw this CID - it was sent in position 1
FAIL	github.com/filecoin-project/go-data-transfer/impl	1.470s

Backwards compatibility for ipldbridge

Hello - so our project Space (https://github.com/fleekhq/space-daemon) is using both github.com/textileio/textile and go-ipfs. Textile brings in some Filecoin dependencies so there is now a conflict in the expected version of go-graphsync between Filecoin and go-ipfs.

The problem is that since ipldbridge was removed after v0.0.5 in this repo, go-ipfs v0.6.0 still depends on that. Meanwhile the Filecoin dependencies listed here need a newer version of go-graphsync:

463:github.com/filecoin-project/[email protected] github.com/ipfs/[email protected]
537:github.com/filecoin-project/[email protected] github.com/ipfs/[email protected]
671:github.com/filecoin-project/[email protected] github.com/ipfs/[email protected]

Are there plans to make this compatible with both Filecoin and IPFS? Apologies if this isn't the best place to post this, maybe it should be done on the Filecoin side but thought I'd start here and get some input before proceeding. Thanks in advance!

Feature Proposal: Simultaneous outgoing request limit

What

Add an option to limit the number of in progress outgoing requests.

Why

Currently, we have a limit on the number of incoming requests processed at the same time in the Responder implementation in Graphsync.

However, there are several cases where we'd like to limit the number of outgoing requests as well. It certainly makes sense as a protocol implementation to have these controls.

How

MaxInProgressRequests is really MaxInProgressIncomingRequests and will be renamed as such.

We will add MaxInProgressOutgoingRequests which will have the same effect for the requestor side.

We will also implement the requestor side with a PeerTaskQueue, meaning that requests will be balanced between peers, so that one slow peer does not cause congestion in the queue.

For discussion

This implementation will support solutions for filecoin-project/lotus#7276. It will allow us to implement option 4 (N in progress simultaneous Storage Deals and M in progress simultaneous Retrieval Deals separately.) for Miners. It is NOT a complete solution to the problem -- it sounds like there needs to some significantly more robust solutions that control the overall flow between transfer and sealing. However, it will be a good start for now.

Note that we are NOT implementing a single global transfer limit for now that encompasses both incoming and outgoing requests.

Response unpaused but never calls outgoing block hook

It seems that under heavy load, a Graphsync response to a request for data will sometimes get stuck in a paused state.

When I run the new lotus concurrent deals test TestDealCyclesConcurrent in the itest kit with concurrency of 32, the test consistently hangs.
It appears that all the retrieval deals complete except for one or two, and the logs for these incomplete deals indicate that the problem is in Graphsync's pausing mechanism.
The branches I used to produce this state are:

  • lotus exp/rebase-itests-refactor-kit
    Includes the new TestDealCyclesConcurrent test
  • go-fil-markets dirkmc/log-validate-pull
    Adds a log line that connects the deal ID to the channel ID so we can match logs at the go-fil-markets level to the go-data-transfer level
  • go-data-transfer dirkmc/log-outgoing-block
    Adds some logging in the Graphsync transport so we can see if the sent / outgoing block hooks are called

I run the concurrent deals test in lotus:

go test ./itests -run TestDealCyclesConcurrent -v

I grep the output for each deal ID until I find the deal that hangs. Then I look for the log line that connects the deal ID to the data transfer channel ID:

2021-06-21T15:03:29.259+0200	INFO	markets-rtvl-reval	requestvalidation/requestvalidation.go:78	validate pull for deal 1624280585293275020 for data transfer channel 12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052

I use that to grep the logs for both the deal ID and the channel ID so as to get all relevant logs:

grep "1624280585293275020\|1624280585292810052" /tmp/out.txt
 
2021-06-21T15:03:29.128+0200	INFO	markets	loggers/loggers.go:25	retrieval client event	{"name": "ClientEventOpen", "deal ID": "1624280585293275020", "state": "DealStatusNew", "message": ""}
2021-06-21T15:03:29.259+0200	INFO	dt-impl	impl/events.go:19	channel 12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052: opened
                                                                                                                                                             ?V)????u,} to channel ID 12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052
2021-06-21T15:03:29.259+0200	INFO	markets	loggers/loggers.go:25	retrieval client event	{"name": "ClientEventDealProposed", "deal ID": "1624280585293275020", "state": "DealStatusWaitForAcceptance", "message": ""}
2021-06-21T15:03:29.259+0200	INFO	dt_graphsync	graphsync/graphsync.go:476	12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052, pull request queued, req=&{BCid:bafk2bzacedr7tnpc62gvo2wcaxswfab4yduzmkv44kodcxbnpxrhaqhdbidk6 Type:0 Paus:false Part:false Pull:true Stor:0xc01035b758 Vouch:0xc01035b788 VTyp:RetrievalDealProposal/1 XferID:1624280585292810052 RestartChannel:--0}
2021-06-21T15:03:29.259+0200	DEBUG	dt_graphsync	graphsync/graphsync.go:520	12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052: received request for data (pull)
2021-06-21T15:03:29.259+0200	INFO	dt-impl	impl/events.go:300	channel 12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052: received new channel request from 12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h
2021-06-21T15:03:29.259+0200	INFO	markets-rtvl-reval	requestvalidation/requestvalidation.go:78	validate pull for deal 1624280585293275020 for data transfer channel 12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052
2021-06-21T15:03:29.390+0200	INFO	markets	loggers/loggers.go:30	retrieval provider event	{"name": "ProviderEventOpen", "deal ID": "1624280585293275020", "receiver": "12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h", "state": "DealStatusNew", "message": ""}
2021-06-21T15:03:29.668+0200	INFO	markets	loggers/loggers.go:30	retrieval provider event	{"name": "ProviderEventDealAccepted", "deal ID": "1624280585293275020", "receiver": "12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h", "state": "DealStatusUnsealing", "message": ""}
2021-06-21T15:03:29.736+0200	DEBUG	dt_graphsync	graphsync/graphsync.go:570	12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052: pausing graphsync response
2021-06-21T15:03:29.736+0200	DEBUG	dt_graphsync	graphsync/graphsync.go:969	12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052: received request for data
                                                                                                                                                             ?V)????u,} to channel ID 12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052
2021-06-21T15:03:29.741+0200	INFO	markets	loggers/loggers.go:30	retrieval provider event	{"name": "ProviderEventUnsealComplete", "deal ID": "1624280585293275020", "receiver": "12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h", "state": "DealStatusUnsealed", "message": ""}
2021-06-21T15:03:29.805+0200	INFO	dt-impl	impl/impl.go:391	resume channel 12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052
2021-06-21T15:03:29.805+0200	DEBUG	dt_graphsync	graphsync/graphsync.go:1055	12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052: unpausing response
2021-06-21T15:03:29.805+0200	WARN	dt-impl	impl/impl.go:400	Error attempting to resume 12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052 at transport level: request is not paused
2021-06-21T15:03:29.998+0200	INFO	dt-impl	impl/events.go:188	channel 12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052: received new response, accepting channel
2021-06-21T15:03:29.999+0200	INFO	markets	loggers/loggers.go:25	retrieval client event	{"name": "ClientEventDealAccepted", "deal ID": "1624280585293275020", "state": "DealStatusAccepted", "message": ""}
2021-06-21T15:03:31.318+0200	INFO	markets	loggers/loggers.go:25	retrieval client event	{"name": "ClientEventPaymentChannelAddingFunds", "deal ID": "1624280585293275020", "state": "DealStatusPaymentChannelAddingInitialFunds", "message": ""}
2021-06-21T15:03:32.511+0200	INFO	markets	loggers/loggers.go:25	retrieval client event	{"name": "ClientEventPaymentChannelReady", "deal ID": "1624280585293275020", "state": "DealStatusPaymentChannelAllocatingLane", "message": ""}
2021-06-21T15:03:32.725+0200	INFO	markets	loggers/loggers.go:25	retrieval client event	{"name": "ClientEventLaneAllocated", "deal ID": "1624280585293275020", "state": "DealStatusOngoing", "message": ""}

Normally after ProviderEventUnsealComplete there would be a ProviderEventBlockSent event.

This log line indicates that hookActions.PauseResponse() is called when a new request for data arrives:

2021-06-21T15:03:29.736+0200	DEBUG	dt_graphsync	graphsync/graphsync.go:570	12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052: pausing graphsync response

These log lines indicate that Graphsync.UnpauseResponse is called but that Graphsync does not believe the response is paused:

2021-06-21T15:03:29.805+0200	DEBUG	dt_graphsync	graphsync/graphsync.go:1055	12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052: unpausing response
2021-06-21T15:03:29.805+0200	WARN	dt-impl	impl/impl.go:400	Error attempting to resume 12D3KooWCGuvzoKipy5Y6DRGeecSH57R2hzS7xtrhFELef7CAY9h-12D3KooWLqudbUu45uQcJCh1eMk7GeH1ZesUGyUqqcpGLapSXxGs-1624280585292810052 at transport level: request is not paused

Therefore the Graphsync transfer remains in the paused state, the outgoing block hook is never called (see the log lines that would be output if it were called) and ProviderEventBlockSent is never fired.

Chore: GraphSyncMessage Can Serialize To A Protobuff

There is a programmatic interface to build a GraphSyncMessage from a series of Requests and Responses, and a method for serializing the message to a Protobuf

  • Build GraphSyncRequest & GraphSyncResponse struct
  • Build GraphSyncMessage struct
  • Build GraphSyncMessage protobuf schema
  • Build methods to serialize to proto

GraphSync user can create a GraphSyncNetwork from LibP2P Host

As a user of the go-graphsync client library, I can initialize a network with a libp2p host that can send GraphSync messages to other nodes so that I can communicate

  • Build network interface
  • Build libp2p Implementation

Acceptance Criteria:

  • Spin up one two libp2p hosts with MockNet
  • Send a GraphSync message from one to another
  • Show that it was received

[flake] TestCancellationViaCommand

 === RUN   TestCancellationViaCommand
      responsemanager_test.go:108: 
          	Error Trace:	responsemanager_test.go:108
          	Error:      	Received unexpected error:
          	            	could not find request
          	Test:       	TestCancellationViaCommand
  --- FAIL: TestCancellationViaCommand (0.01s)

GraphSync user can validate a response from another node

As a user of the GraphSync client library, I want to be able to verify that a response to one of my queries that I receive from another node is correct, so that I am not vulnerable to malicious nodes

  • Augment ProcessResponses on the RequestManager to use the Validate method on go-selector

--

Acceptance Criteria:

  • Setup two nodes with Mocknet
  • Setup GraphSync on both nodes
  • Setup mock goselector to return a response to a query, but setup it's Validate method to say parts of the query are not valid and filter the bad results
  • Send graphsync query from one node to the other
  • Verify only the valid responses are received by the requestor

Request Authentication/Validation Hooks

Create hooks to validate/authenticate Graphsync requests when received by responder

Approach:

  • Hook is added as a variadic option on startup
  • as a replacement for a default hook which will have a recursion depth limit
  • Hook is a function with the signature:
type RequestValidationFunc func(peer ID, root CID, selector Selector, extra Extra) err

nil = success
err = fail, message in err

(may finagle with return type)


open questions:

  • multiple validation hooks
  • default validation?
  • adding removing hooks after initialization

GraphSync can partially cancel a request

This isn't necessary for the MVP but GraphSync will eventually need some way to partially cancel a request. We can do this by either:

  1. Adding a way to tell a remote peer to not send some sub-dag rooted at a specific CID. This will require a protocol change but may be the easiest to implement.
  2. Making it possible to atomically cancel one selector and replace it with another without sending duplicate blocks. To do this:
    a. The requester would need to be able to create a selector for the remainder of an in-progress selector.
    b. The responder would need to keep track of the last N blocks (or just a bloom filter?) sent to the requester as the replacement selector will end up overlapping with the original selector.

Replace protobuf messages with IPLD

go-graphsync currently uses google protobuf for messaging. Since graphsync depends on IPLD and IPLD can be used in place of GPB, we can reduce dependencies, tooling and simplify the design by replacing GPB with IPLD. IPLD schema can be used to define the messages and dag-cbor could be used to encode them. @warpfork said this was previously discussed but cut due to time constraints - adding this issue for tracking

Chore: RequestManager can send requests out to the network

RequestManager can receive a request with a peer id, root node, and selector, and send the request out to the network

  • Build RequestManager struct
  • Build Request Method
  • Build mechanism for converting request to a GraphSyncMessage then sending to the network

GraphSync: MQ attemptSendAndRecovery retries have malformed length

Issue

I am encountering stuck filecoin deal transfers, as well as I/O timeouts from dealbot. Upon further investigation, it looks like GraphSync's attemptSendAndRecovery's resends have malformed lengths and continuously fail.

See below for the miner log. Note that Graphsync ReceiveError: stream reset error followed by exactly 20 Graphsync ReceiveError: short buffers.

2020-09-01T10:16:50.552Z        ^[[34mINFO^[[0m markets loggers/loggers.go:19   storage event   {"name": "ProviderEventOpen", "proposal CID": "bafyreihib4js3vrpfrifibg2r76jkc5q4zjof4iyj2lwgxig2a5ihuvmau", "state": "StorageDealValidating", "message": ""}
2020-09-01T10:16:50.555Z        ^[[34mINFO^[[0m markets loggers/loggers.go:19   storage event   {"name": "ProviderEventDealDeciding", "proposal CID": "bafyreihib4js3vrpfrifibg2r76jkc5q4zjof4iyj2lwgxig2a5ihuvmau", "state": "StorageDealAcceptWait", "message": ""}
2020-09-01T10:16:50.587Z        ^[[34mINFO^[[0m markets loggers/loggers.go:19   storage event   {"name": "ProviderEventDataRequested", "proposal CID": "bafyreihib4js3vrpfrifibg2r76jkc5q4zjof4iyj2lwgxig2a5ihuvmau", "state": "StorageDealWaitingForData", "message": ""}
2020-09-01T10:16:50.828Z        ^[[34mINFO^[[0m markets loggers/loggers.go:19   storage event   {"name": "ProviderEventDataTransferInitiated", "proposal CID": "bafyreihib4js3vrpfrifibg2r76jkc5q4zjof4iyj2lwgxig2a5ihuvmau", "state": "StorageDealTransferring", "message": ""}
2020-09-01T10:17:35.394Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: stream reset
2020-09-01T10:17:48.289Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:18:15.300Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:18:16.048Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:18:29.050Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:18:30.301Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:18:39.540Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:18:40.028Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:18:48.125Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:18:52.574Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:18:56.703Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:19:02.134Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:19:02.479Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:19:06.699Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:19:07.052Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:19:09.380Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:19:11.110Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:19:11.720Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:19:13.617Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:19:13.963Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer
2020-09-01T10:19:15.838Z        ^[[34mINFO^[[0m graphsync       impl/graphsync.go:244   Graphsync ReceiveError: short buffer

Given that sendMessage will issue a reset, and then attempt to retry 10 times (a multiple of 20),, it looks like a condition has caused incorrect buffer lengths to be repeatedly sent to the receiver for both messages. The retries all fail due to this error, and the connection is terminated.

Please note the timestamps -- the errors span 1 minute and 40 seconds.

Ruling out my internet connection

I have set up a remote connectivity tester, which opens a socket to the miner my port every minute. Logs are available here: http://139.180.162.246/readlog.php

Given that I am able to connect every minute and the short buffer errors span more than 1 minute, I think I can rule out my internet connection being down or messing up for the entire period of 1 min 40 seconds.

Furthermore, while anecdotal, I have no issues transferring terabyte files to the public internet over SCP, over days, without any interruptions, broken pipes, or resets.

My continuous SSH connections, from my laptop to the miner machine, have not been broken for more than 72 hours since I opened it.

GraphSync user can send a real query over a real network and receive a real response

As a Filecoin developer, I want to use LibP2P, GoGraphSync, and GoSelector libraries to send real graphsync queries over real networks so I can speed up queries for deeply nested nodes

  • Once go-selector is done, setup GoGraphSync to use the real version, either by translating interfaces or by renaming or changing interfaces as neccesary inside of GoGraphSync

--

Will refine once go-selector is done and it's clearer how to integrate with FileCoin

GraphSync user can cancel a request that is in process

As a user of the GraphSync client library, I want to be able to cancel a graphsync request that is in progress, so that other nodes stop doing work and stop sending data over the network.

  • Calling cancel on a SelectorQuery returned by GraphSync triggers sending a cancel request over the network
  • Receiving a cancel message from the network in the ResponseManager cancels the SelectorQuery returned by GoSelector

--

Acceptance Criteria:

  • Setup two nodes with connected with MockNet
  • Setup both nodes with GraphSync
  • Send a GraphSync query from one node to the other
  • Cancel the GraphSync query from the requestor node
  • Verify the responder nodes SelectorQuery is cancelled

RFC: Graphsync External Interfaces for FileCoin

This document is meant to stimulate discussion and work towards a final proposal for the external interfaces that GraphSync provides that FileCoin will use.

Go-IPLD-Prime

go-graphsync relies on go-ipld-prime to traverse IPLD Selectors in an IPLD graph. go-ipld-prime is a complete rewrite of go IPLD libraries, and looks A LOT different than go-ipld-format and go-ipld-cbor. In order to use go-graphsync, some understanding and use of go-ipld-prime concepts is necessary. However, go-graphsync can provide some translation layers as necessary if FileCoin does not want to integrate go-ipld-prime in the rest of its code base

Initializing a GraphSync Exchange

Here is the current proposed signature for initializing GraphSync:

func New(ctx context.Context, 
        host libp2p.Host, 
        loader ipld.Loader)

Parameter Notes:

  1. context is just the parent context for all of GraphSync
  2. host is any libp2p host
  3. loader is used to load blocks from the local block store when RESPONDING to requests from other clients. See the IPLD loader interface: https://github.com/ipld/go-ipld-prime/blob/master/linking.go

How To Write A Loader From The Stuff You Know

Coming from a pre-go-ipld-prime world, you probably expect a link loading function signature to look like this:

type Cid2BlockFn func (lnk cid.Cid) (blocks.Block, error)

in go-ipld-prime, the signature for a link loader is as follows:

type Loader func(lnk Link, lnkCtx LinkContext) (io.Reader, error)

go-ipld-prime intentionally keeps its interfaces as abstract as possible to limit dependencies on other ipfs/filecoin specific packages. An IPLD Link is an abstraction for a CID, and IPLD expects io.Reader's rather than an actual block. IPLD provides a cidLink package for working with Links that use CIDs as the underlying data, and it's safe to assume that's the type in use if your code deals only with CIDs. Anyway, a conversion would look something like this:

import (
   ipld "github.com/ipld/go-ipld-prime"
   cidLink "github.com/ipld/go-ipld-prime/linking/cid"
)

func LoaderFromCid2BlockFn(cid2BlockFn Cid2BlockFn) ipld.Loader {
	return func(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) {
		asCidLink, ok := lnk.(cidlink.Link)
		if !ok {
			return nil, fmt.Errorf("Unsupported Link Type")
		}
		block, err := cid2BlockFn(asCidLink.Cid)
		if err != nil {
			return nil, err
		}
		return bytes.NewReader(block.RawData()), nil
	}
}

Go GraphSync can provide the above as a utility function if neccesary.

Calling Graphsync

graphSync.Request(ctx context.Context, p peer.ID, cidRootedSelector Node) (<-chan ResponseProgress, <-chan error)

Paramater Notes:

  1. ctx is the context for this request. To cancel an in progress request, cancel the context.
  2. p is the peer you will send this request to
  3. cidRootedSelector is the a go-ipld-prime node

Building a path selector

A rooted selector is a go-ipld-prime node that follows the spec outlined here: ipld/specs#95

go-ipld-prime provides a series of builder interfaces for building this kind of structured data into a node. Assuming, in Filecoin's case, you are beginning with a CID and a path, represented by an array of strings, you could construct the node as follows:

import (
	ipld "github.com/ipld/go-ipld-prime"
	free "github.com/ipld/go-ipld-prime/impl/free"
	fluent "github.com/ipld/go-ipld-prime/fluent"
        cidLink "github.com/ipld/go-ipld-prime/linking/cid"
)

func SelectorSpecFromCidAndPath(lnk cid.Cid, pathSegments []string) (ipld.Node, error) {
	var node ipld.Node
	err := fluent.Recover(func() {
		builder := fluent.WrapNodeBuilder(free.NodeBuilder())
		node = builder.CreateMap(func (mb fluent.MapBuilder, knb fluent.NodeBuilder, vnb fluent.NodeBuilder) {
			mb.Insert(knb.CreateString("root"), vnb.CreateLink(cidLink.Link{lnk}))
			mb.Insert(knb.CreateString("selectors"), 
			vnb.CreateList(func (lb fluent.ListBuilder, vnb fluent.NodeBuilder) {
				for _, pathSegment := range pathSegments {
					lb.Append(CreateMap(
						func (mb fluent.MapBuilder, knb fluent.NodeBuilder, vnb fluent.NodeBuilder) {
							mb.Insert(knb.CreateString("selectPath"), vnb.CreateString(pathSegment))
						},
					))
				}
			}))
		});
	})
	if err != nil {
		return nil, err
	}
	return node, nil
}

go-graphsync can provide this as a utility function

Response Type

Outstanding Question 1: Nodes or Blocks?

Does GraphSync return go-ipld-prime nodes, or blocks? This is a tough question. The advantage of nodes is that they are way more informative and represent structured data. They are also the data type that go-ipld-prime's selectors work with. At the same time, returning Nodes means that the GraphSync client really needs to understand go-ipld-prime and integrate with it. Right now, neither Filecoin or go-ipfs integrate with go-ipld-prime.

The other integrations points are:

  1. The initialization parameters which may not be neccesary to begin with (see above), and are otherwise easy to assemble from the go-ipld-prime library
  2. Building selectorSpecs -- this seems unavoidable, but is also a novel use case that does not necessarily require changing a lot of code to use go-ipld-prime elsewhere in the client library

By contrast if a selection returns Nodes, the Graphsync client really needs to understand them -- particularly cause nodes are not necessarily a whole block, but can exist inside a block.

Seeing "ProcessResponses: %!s(<nil>)" warning in Lotus markets logs using GS 0.9.1

Using Lotus markets version v1.11.3-rc1 graphsync v0.9.1 logs show these warnings during data transfers. It does not seem to affect the transfers.

2021-09-09T20:00:12.172Z WARN graphsync requestmanager/requestmanager.go:290 ProcessResponses: %!s()
2021-09-09T20:00:12.371Z WARN graphsync requestmanager/requestmanager.go:290 ProcessResponses: %!s()
2021-09-09T20:00:12.490Z WARN graphsync requestmanager/requestmanager.go:290 ProcessResponses: %!s()
2021-09-09T20:00:12.611Z WARN graphsync requestmanager/requestmanager.go:290 ProcessResponses: %!s()
2021-09-09T20:00:12.726Z WARN graphsync requestmanager/requestmanager.go:290 ProcessResponses: %!s()

Lotus client is Estuary

incoming / outgoing / sent block hooks called again for each block on restart

There are a couple of bugs open against lotus whereby the "transferred" field increases by the already sent/received data size each time a transfer is restarted: filecoin-project/lotus#5225 filecoin-project/lotus#5133

I believe the root cause is that when a data transfer is restarted, graphsync triggers hooks for all blocks that have already been sent or received for

  • RegisterIncomingBlockHook
  • RegisterOutgoingBlockHook
  • RegisterBlockSentListener

go-data-transfer also watches for these events in order to detect whether a reconnect has occurred.

I think it would make most sense for these events to only be triggered when the block has actually been sent / received over the network.
Alternatively the event could still fire, but the event data could include a flag indicating whether the block was sent / received over the network.

Graphsync User Can Make a GraphSync Request to the network

As a user of the graphsync client library, I can send a GraphSync query to another peer over the network, so that peer can process my request

  • Setup GraphSync top level struct
  • Build Request Method that calls RequestManager

--
Acceptance Criteria:

  • Setup two hosts connected via MockNet
  • Setup GraphSync on one peer
  • Make a GraphSync query to other peer
  • Verify network message sent to second peer

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.