精通IPFS:IPFS 获取内容之下篇
streamBytes
函数,根据偏移量、长度及节点的连接数组,获取指定的内容。在获取文件过程中,通过调用
streamBytes
函数来获取整个文件的内容。
streamBytes
函数调用完成后返回的
pull
函数生成的 through 流就是我们要读取内容的流,它最终被传递给 IPFS
core/components/get-pull-stream.js
中返回的
pull
函数生成的 through 流,而这个流又被同目录下
get.js
文件中 pull-stream 类库中的
asyncMap
流的处理函数转换为完整的缓冲区,从而被最终的应用的所使用,这段程序代码如下:
上篇文章的内容,我们回忆到这里就结束了,下面我们仔细研究
pull(
self.getPullStream(ipfsPath, options),
pull.asyncMap((file, cb) => {
if (file.content) {
pull(
file.content,
pull.collect((err, buffers) => {
if (err) { return cb(err) }
file.content = Buffer.concat(buffers)
cb(null, file)
})
)
} else {
cb(null, file)
}
}),
pull.collect(callback)
)
streamBytes
函数及相关的深度遍历是如何实现的。
streamBytes
函数使用了 pull-traverse 类库提供深度优先、广度优先、叶子优先等算法,它的每个算法都返回一个 pull 类库的
through
流,这个流被它后面的流所调用。在这里使用深度优先算法,返回的流被 pull 类库的
map
流所调用,用于获取每一个元素。
深度优先算法的相关代码如下:
var once = exports.once =
function (value) {
return function (abort, cb) {
if(abort) return cb(abort)
if(value != null) {
var _value = value; value = null
cb(null, _value)
} else
cb(true)
}
}var depthFirst = exports.depthFirst =
function (start, createStream) {
var reads = [], ended
reads.unshift(once(start))
return function next (end, cb) {
if(!reads.length)
return cb(true)
if(ended)
return cb(ended)
reads[0](end, function (end, data) {
if(end) {
if(end !== true) {
ended = end
reads.shift()
while(reads.length)
reads.shift()(end, function () {})
return cb(end)
}
reads.shift()
return next(null, cb)
}
reads.unshift(createStream(data))
cb(end, data)
})
}
}
streamBytes
函数定义在
file.js
文件中,我们来看下它的内容:
function streamBytes (dag, node, fileSize, offset, length) {
if (offset === fileSize || length === 0) {
return once(Buffer.alloc(0))
} const end = offset + length
return pull(
traverse.depthFirst({
node,
start: 0,
end: fileSize
}, getChildren(dag, offset, end)),
map(extractData(offset, end)),
filter(Boolean)
)
}
根据深度优先算法代码我们可知道,它首先把第一个参数包装成一个 pull 类库的
once
流,在这里即把我们的根 DAG 节点包装成一个
once
流,然后作为内部数组的第一个元素,最后返回 pull 类库的
through
流。
我们把返回类似当 pull 类库的function next (end, cb) {}
签名的函数称为 pull 类库的through
流,这个函数被称为读函数。因为它会被后面的流所调用,用来从流中读取数据,当读取数据之后,这个函数通过参数中指定的回调函数把数据传递给后面的流,即传递给调用自己的流。
map
函数返回的
through
流调用深度遍历函数所返回的读取函数时,该读取函数执行如下:
-
如果内部数组中没有数据可以读取,那么调用
map
函数返回的through
流的读取函数,并返回。if(!reads.length) return cb(true)
-
如果
ended
变量为真,那么以这个变量为参数调用map
函数返回的through
流的读取函数,并返回。if(ended) return cb(ended)
-
最后,调用内部数组的第一个元素(类型为 pull 类库的
through
流的读取函数)来读取数据。当读取到数据之后,调用自定义的内部函数来处理数据,在这个内部函数中处理如下:-
如果当前读取完成,即
end
为真时,执行下面逻辑。如果end
不是严格真(出现在变量为字符串true
情况),那么:设置变量ended
为end
的值;删除数组中的第一个元素;如果数组长度不为0,那么持续删除第一个元素(类型为函数),并且调用这个删除的元素;当数组长度为空时,调用回调函数进行处理。否则,即end
严格为真,从数组中删除第一个元素,因为这意味着数组的当前元素的已经处理完成,所以需要调用外层函数继续从数组中读取数据。if(end) { if(end !== true) { ended = end reads.shift()
}while(reads.length) reads.shift()(end, function () {})
return cb(end) }
reads.shift() return next(null, cb)
-
调用
createStream
函数来处理读取到的数据,这个createStream
函数即是我们提供给深度遍历算法的第二个参数getChildren
函数返回的内部函数。getChildren
函数返回的内部函数最终会返回一个pull
函数生成的 through 流。在这个流中通过 pull-stream 类库的flatten
流会把获取到的每个节点及其内部节点最终转换成一个数组形式,比如把
这样的形式转化成下面的形式[1, 2, 3], [4, 5, 6], [7, 8]
这里 [1, 2, 3] 可以认为是第一个 Link 碎片,它下面又有三个包含最终数据的 DAG 节点;[4, 5, 6] 是第二个 Link 碎片,它下面也有三个包含最终数据的 DAG 节点; [7, 8] 是第三个 Link 碎片,它下面只有二个包含最终数据的 DAG 节点。[1, 2, 3, 4, 5, 6, 7, 8]
我们可以发现,通过深度遍历优先算法及其处理函数
getChildren
返回的内部函数流,我们会分别获取每个碎片及其保存的子碎片,并且把它们以正确的顺序排列在一起形成数组,从而最终获取到了 DAG 节点的完整数据。
-
如果当前读取完成,即
getChildren
函数返回的内部函数处理方法如下:
-
如果当前节点对象是一个缓冲区对象,即当前节点是一个叶子节点,那么直接返回一个空的流,因为没有办法再次进行遍历。
if (Buffer.isBuffer(node)) { return empty() }
-
调用静态方法把当前节点对象转换成一个文件对象。
let file
try { file = UnixFS.unmarshal(node.data) } catch (err) { return error(err) }
-
判断流的开始位置。
const nodeHasData = Boolean(file.data && file.data.length) if (nodeHasData && node.links.length) { streamPosition += file.data.length }
-
处理当前节点包含的 Link 信息,并过滤掉不在指定范围内的 Link 信息,然后按顺序返回 Link 信息数组。
const filteredLinks = node.links .map((link, index) => { const child = { link: link, start: streamPosition, end: streamPosition + file.blockSizes[index], size: file.blockSizes[index] }
streamPosition = child.end return child
}) .filter((child) => { return (offset >= child.start && offset < child.end) || // child has offset byte (end > child.start && end <= child.end) || // child has end byte (offset < child.start && end > child.end) // child is between offset and end bytes })
-
如果最终返回的 Link 信息数组存在,则设置流的起始位置为第一个 Link 信息的开头位置。
if (filteredLinks.length) { streamPosition = filteredLinks[0].start }
-
返回一个
pull
函数构成的流。return pull( once(filteredLinks), paramap((children, cb) => { dag.getMany(children.map(child => child.link.cid), (err, results) => { if (err) { return cb(err) }
cb(null, results.map((result, index) => { const child = children[index]
return { start: child.start, end: child.end, node: result, size: child.size } })) })
在这个流中,}), flatten() )
paramap
函数返回的流会调用once
函数返回的一次性流,once
函数返回的一次性流会把 Link 信息数组传递给前者。而前者的处理函数会对 Link 信息数组中的每个碎片进行处理(这里只有一个 Link 信息数组,即只有children
数组,而不是多个children
数组,但是children
数组包含了所有 Link 信息)。在paramap
函数返回的流的处理函数中调用 IPLD 对象的getMany
获取每个 Link 节点的数据,并对返回的数据进行整理,然后以整理后的数组为参数,调用下一个流---即flatten
流的读取函数中指定的---回调函数,把最终的数组传递给它。最终,数组被flatten
流进行扁平化处理后,传递给外部的pull
函数中的流,即前面所看到的 pull 类库的map
流的read
函数中指定的那个函数,在这个函数中又会调用我们提供的extractData
函数返回的内部函数来处理每一个节点对象。
extractData
函数返回的内部函数比较简单,主要是对获取到的每个碎片数据进行处理,然后返回对应的数组,它的代码如下,读者可自行分析,这里不再细讲。
function getData ({ node, start, end }) {
let block if (Buffer.isBuffer(node)) {
block = node
} else {
try {
const file = UnixFS.unmarshal(node.data)
if (!file.data) {
if (file.blockSizes.length) {
return
}
return Buffer.alloc(0)
}
block = file.data
} catch (err) {
throw new Error(`Failed to unmarshal node - ${err.message}`)
}
}
if (block && block.length) {
if (streamPosition === -1) {
streamPosition = start
}
const output = extractDataFromBlock(block, streamPosition, requestedStart, requestedEnd)
streamPosition += block.length
return output
}
return Buffer.alloc(0)
}
위믹스3.0 최초의 DAO ‘원더다오’, 노드 카운슬 파트너 합류
위믹스3.0 최초의 DAO ‘원더다오’, 노드 카운슬 파트너 합류 l 탈중앙화 자율조직 형태로 결성…구성원 모두가 위믹스3.0 운영에 참여 가능l 스마트 컨트랙트 기반 모듈화 ...
[주간톡톡] 양의 탈을 쓴 늑대는 양일까? 늑대일까?
주간톡톡은 한주간의 블록체인 소식을 재구성해 독자들과 재미있게 이야기해보는 코너입니다. 이번주는 페이스북의 스테이블 코인 프로젝트 '디엠(Diem)'에 대해 알아보겠습니다. 그럼 ...
IBM, 블록체인 와인 추적 서비스 '빈어슈어(Assure)' 공개
IBM이 재배지부터 매장까지 와인 공급망을 추적할 수 있는 블록체인 기반 플랫폼을 공개했다.10일(현지시간) 발표에 따르면 IBM은 와인 모니터링 업체 e프로브넌스(eProven...