Skip to content
56 changes: 54 additions & 2 deletions client.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ DHT.prototype._put = function (opts, cb) {

var table = this._tables.get(key.toString('hex'))
if (!table) return this._preput(key, opts, cb)
if (opts.backoff && this._backoff(opts, table)) return cb(null, key)

var message = {
q: 'put',
Expand Down Expand Up @@ -202,6 +203,52 @@ DHT.prototype._put = function (opts, cb) {
return key
}

DHT.prototype._backoff = function (opts, table) {
var self = this

var v = typeof opts.v === 'string' ? Buffer.from(opts.v) : opts.v
var isMutable = !!opts.k
var key = isMutable
? sha1(opts.salt ? Buffer.concat([opts.salt, opts.k]) : opts.k)
: sha1(bencode.encode(v))

var MAX_COPIES = 8

// For mutable items only nodes holding values with the most
// recent known sequence number count towards meeting these conditions
function filterMaxSeq (arr) {
var max = arr.map(function (r) {
return r.seq || 0
}).reduce(function (p, c) {
return p > c ? p : c
}, 0)
arr = arr.filter(function (r) {
return r.seq !== undefined && r.seq === max
})
.filter(function (r) {
var val = typeof r.v === 'string' ? Buffer.from(r.v) : r.v
return val && equals(v, val) // only if it equals input
})
return arr
}

var copies = table.toArray()
copies = filterMaxSeq(copies)

// The 8 nodes closest to the target key which are eligible for a store
// all have indicated they have the data, either by returning it or through the seq number.
var closest = table.closest(key)
closest = filterMaxSeq(closest)

if (copies.length > MAX_COPIES && // They find more than 8 copies of the value
closest.length > MAX_COPIES) {
self._debug('backing off, found %s copies AND %s closest nodes holding data', copies.length, closest.length)
return true
}

return false
}

DHT.prototype._preput = function (key, opts, cb) {
var self = this

Expand Down Expand Up @@ -541,12 +588,17 @@ DHT.prototype._closest = function (target, message, onmessage, cb) {

if (message.r.token && message.r.id && Buffer.isBuffer(message.r.id) && message.r.id.length === 20) {
self._debug('found node %s (target: %s)', message.r.id, target)
table.add({
var o = {
id: message.r.id,
host: node.host || node.address,
port: node.port,
token: message.r.token
})
}
if (message.r.v) {
o.v = message.r.v
o.seq = message.r.seq
}
table.add(o)
}

if (!onmessage) return true
Expand Down
162 changes: 162 additions & 0 deletions test/dht_backoff.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
var common = require('./common')
var DHT = require('../')
var ed = require('ed25519-supercop')
var test = require('tape')

test('backoff algorithm with > 8 nodes', function (t) {
// dht1 <---
// |---> 9 nodes
// dht2 <---

t.plan(2)

var nodes = 9
var dhts = []
var pending = 2

var dht1 = new DHT({ bootstrap: false, verify: ed.verify })
var dht2 = new DHT({ bootstrap: false, verify: ed.verify })

common.failOnWarningOrError(t, dht1)
common.failOnWarningOrError(t, dht2)

dht1.listen(listen)
dht2.listen(listen)

function listen () {
if (--pending !== 0) return
for (var i = 0; i < nodes; i++) {
(function (i) {
var d = new DHT({ bootstrap: false, verify: ed.verify })
dhts.push(d)
common.failOnWarningOrError(t, d)
pending++
d.listen(function () {
if (--pending === 0) addNodes()
})
})(i)
}
}

function addNodes () {
var pending = dhts.length
dhts.forEach(function (d) {
d.addNode({ host: '127.0.0.1', port: dht1.address().port })
d.addNode({ host: '127.0.0.1', port: dht2.address().port })
d.once('node', function () {
if (--pending === 0) ready()
})
})
}
t.once('end', function () {
dht1.destroy()
dht2.destroy()
for (var i = 0; i < dhts.length; i++) {
dhts[i].destroy()
}
})

function ready () {
var keypair = ed.createKeyPair(ed.createSeed())
var value = common.fill(500, 'abc')
var opts = {
k: keypair.publicKey,
sign: common.sign(keypair),
seq: 0,
v: value,
backoff: true
}

dht1.put(opts, function (err, hash) {
t.error(err)
opts.seq++
var onput = dht1._onput // save the onput function
dht1._onput = function () {
t.fail('shouldn\'t put')
onput.apply(dht1, arguments) // call the original onput function
}
dht2.put(opts, function (err, hash) {
t.error(err)
})
})
}
})

test('backoff algorithm with < 8 nodes', function (t) {
// dht1 <---
// |---> 4 nodes
// dht2 <---

t.plan(3)

var nodes = 4
var dhts = []
var pending = 2

var dht1 = new DHT({ bootstrap: false, verify: ed.verify })
var dht2 = new DHT({ bootstrap: false, verify: ed.verify })

common.failOnWarningOrError(t, dht1)
common.failOnWarningOrError(t, dht2)

dht1.listen(listen)
dht2.listen(listen)

function listen () {
if (--pending !== 0) return
for (var i = 0; i < nodes; i++) {
(function (i) {
var d = new DHT({ bootstrap: false, verify: ed.verify })
dhts.push(d)
common.failOnWarningOrError(t, d)
pending++
d.listen(function () {
if (--pending === 0) addNodes()
})
})(i)
}
}

function addNodes () {
var pending = dhts.length
dhts.forEach(function (d) {
d.addNode({ host: '127.0.0.1', port: dht1.address().port })
d.addNode({ host: '127.0.0.1', port: dht2.address().port })
d.once('node', function () {
if (--pending === 0) ready()
})
})
}
t.once('end', function () {
dht1.destroy()
dht2.destroy()
for (var i = 0; i < dhts.length; i++) {
dhts[i].destroy()
}
})

function ready () {
var keypair = ed.createKeyPair(ed.createSeed())
var value = common.fill(500, 'abc')
var opts = {
k: keypair.publicKey,
sign: common.sign(keypair),
seq: 0,
v: value,
backoff: true
}

dht1.put(opts, function (err, hash) {
t.error(err)
opts.seq++
var onput = dht1._onput // save the onput function
dht1._onput = function () {
t.pass('should put')
onput.apply(dht1, arguments) // call the original onput function
}
dht2.put(opts, function (err, hash) {
t.error(err)
})
})
}
})