508 lines
14 KiB
JavaScript
508 lines
14 KiB
JavaScript
var fs = require('fs')
|
|
var pull = require('pull-stream')
|
|
var multicb = require('multicb')
|
|
var pl = require('pull-level')
|
|
var pushable = require('pull-pushable')
|
|
var paramap = require('pull-paramap')
|
|
var cat = require('pull-cat')
|
|
var Notify = require('pull-notify')
|
|
var toPull = require('stream-to-pull-stream')
|
|
var ref = require('ssb-ref')
|
|
var pathlib = require('path')
|
|
var u = require('./util')
|
|
|
|
exports.name = 'patchwork'
|
|
exports.version = '1.0.0'
|
|
exports.manifest = require('./manifest')
|
|
exports.permissions = require('./permissions')
|
|
|
|
exports.init = function (sbot, opts) {
|
|
|
|
var api = {}
|
|
var phoenixdb = sbot.sublevel('patchwork')
|
|
var db = {
|
|
isread: phoenixdb.sublevel('isread'),
|
|
subscribed: phoenixdb.sublevel('subscribed')
|
|
}
|
|
var state = {
|
|
// indexes (lists of {key:, ts:})
|
|
mymsgs: [],
|
|
home: u.index(), // also has `.isread`
|
|
inbox: u.index(), // also has `.isread` and `.author`
|
|
votes: u.index(), // also has `.isread`, `.vote`, and `.votemsg`
|
|
myvotes: u.index(), // also has `.vote`
|
|
follows: u.index(), // also has `.isread` and `.following`
|
|
|
|
// views
|
|
profiles: {},
|
|
sites: {},
|
|
names: {}, // ids -> names
|
|
ids: {}, // names -> ids
|
|
actionItems: {}
|
|
}
|
|
|
|
var processor = require('./processor')(sbot, db, state, emit)
|
|
pull(pl.read(sbot.sublevel('log'), { live: true, onSync: onPrehistorySync }), pull.drain(processor))
|
|
|
|
// track sync state
|
|
// - processor does async processing for each message that comes in
|
|
// - awaitSync() waits for that processing to finish
|
|
// - pinc() on message arrival, pdec() on message processed
|
|
// - nP === 0 => all messages processed
|
|
var nP = 0, syncCbs = []
|
|
function awaitSync (cb) {
|
|
if (nP > 0)
|
|
syncCbs.push(cb)
|
|
else cb()
|
|
}
|
|
state.pinc = function () { nP++ }
|
|
state.pdec = function () {
|
|
nP--
|
|
if (nP === 0) {
|
|
syncCbs.forEach(function (cb) { cb() })
|
|
syncCbs.length = 0
|
|
}
|
|
}
|
|
|
|
var isPreHistorySynced = false // track so we dont emit events for old messages
|
|
// grab for history sync
|
|
state.pinc()
|
|
function onPrehistorySync () {
|
|
console.log('Log history read...')
|
|
// when all current items finish, consider prehistory synced (and start emitting)
|
|
awaitSync(function () {
|
|
console.log('Indexes generated')
|
|
isPreHistorySynced = true
|
|
})
|
|
// release
|
|
state.pdec()
|
|
}
|
|
|
|
// events stream
|
|
var notify = Notify()
|
|
function emit (type, data) {
|
|
if (!isPreHistorySynced)
|
|
return
|
|
var e = data || {}
|
|
e.type = type
|
|
if (e.type == 'index-change') {
|
|
api.getIndexCounts(function (err, counts) {
|
|
e.total = counts[e.index]
|
|
e.unread = counts[e.index+'Unread']
|
|
notify(e)
|
|
})
|
|
} else
|
|
notify(e)
|
|
}
|
|
|
|
// getters
|
|
|
|
api.createEventStream = function () {
|
|
return notify.listen()
|
|
}
|
|
|
|
api.getPaths = function (cb) {
|
|
cb(null, {
|
|
site: pathlib.join(opts.path, 'publish')
|
|
})
|
|
}
|
|
|
|
api.getMyProfile = function (cb) {
|
|
awaitSync(function () {
|
|
api.getProfile(sbot.id, cb)
|
|
})
|
|
}
|
|
|
|
function isInboxFriend (row) {
|
|
if (row.author == sbot.id) return true
|
|
var p = state.profiles[sbot.id]
|
|
if (!p) return false
|
|
return p.assignedTo[row.author] && p.assignedTo[row.author].following
|
|
}
|
|
|
|
api.getIndexCounts = function (cb) {
|
|
awaitSync(function () {
|
|
cb(null, {
|
|
inbox: state.inbox.rows.filter(isInboxFriend).length,
|
|
inboxUnread: state.inbox.filter(function (row) { return isInboxFriend(row) && row.author != sbot.id && !row.isread }).length,
|
|
votes: state.votes.filter(function (row) { return row.vote > 0 }).length,
|
|
votesUnread: state.votes.filter(function (row) { return row.vote > 0 && !row.isread }).length,
|
|
follows: state.follows.filter(function (row) { return row.following }).length,
|
|
followsUnread: state.follows.filter(function (row) { return row.following && !row.isread }).length,
|
|
home: state.home.rows.length
|
|
})
|
|
})
|
|
}
|
|
|
|
api.createInboxStream = indexStreamFn(state.inbox, function (row) {
|
|
if (!isInboxFriend(row)) return false
|
|
return row.key
|
|
})
|
|
api.createVoteStream = indexStreamFn(state.votes, function (row) {
|
|
if (row.vote <= 0) return false
|
|
return row.votemsg
|
|
})
|
|
api.createMyvoteStream = indexStreamFn(state.myvotes, function (row) {
|
|
if (row.vote <= 0) return false
|
|
return row.key
|
|
})
|
|
api.createFollowStream = indexStreamFn(state.follows)
|
|
api.createHomeStream = indexStreamFn(state.home)
|
|
|
|
function indexMarkRead (indexname, key, keyname) {
|
|
if (Array.isArray(key)) {
|
|
key.forEach(function (k) {
|
|
indexMarkRead(indexname, k, keyname)
|
|
})
|
|
return
|
|
}
|
|
|
|
var index = state[indexname]
|
|
var row = index.find(key, keyname)
|
|
if (row) {
|
|
var wasread = row.isread
|
|
row.isread = true
|
|
if (!wasread)
|
|
emit('index-change', { index: indexname })
|
|
}
|
|
}
|
|
|
|
function indexMarkUnread (indexname, key, keyname) {
|
|
if (Array.isArray(key)) {
|
|
key.forEach(function (k) {
|
|
indexMarkUnread(indexname, k, keyname)
|
|
})
|
|
return
|
|
}
|
|
|
|
var index = state[indexname]
|
|
var row = index.find(key, keyname)
|
|
if (row) {
|
|
var wasread = row.isread
|
|
row.isread = false
|
|
if (wasread)
|
|
emit('index-change', { index: indexname })
|
|
}
|
|
}
|
|
|
|
api.markRead = function (key, cb) {
|
|
indexMarkRead('inbox', key)
|
|
indexMarkRead('votes', key, 'votemsg')
|
|
indexMarkRead('follows', key)
|
|
if (Array.isArray(key))
|
|
db.isread.batch(key.map(function (k) { return { type: 'put', key: k, value: 1 }}), cb)
|
|
else
|
|
db.isread.put(key, 1, cb)
|
|
}
|
|
api.markUnread = function (key, cb) {
|
|
indexMarkUnread('inbox', key)
|
|
indexMarkUnread('votes', key, 'votemsg')
|
|
indexMarkUnread('follows', key)
|
|
if (Array.isArray(key))
|
|
db.isread.batch(key.map(function (k) { return { type: 'del', key: k }}), cb)
|
|
else
|
|
db.isread.del(key, cb)
|
|
}
|
|
api.toggleRead = function (key, cb) {
|
|
api.isRead(key, function (err, v) {
|
|
if (!v) {
|
|
api.markRead(key, function (err) {
|
|
cb(err, true)
|
|
})
|
|
} else {
|
|
api.markUnread(key, function (err) {
|
|
cb(err, false)
|
|
})
|
|
}
|
|
})
|
|
}
|
|
api.isRead = function (key, cb) {
|
|
if (Array.isArray(key)) {
|
|
var done = multicb({ pluck: 1 })
|
|
key.forEach(function (k, i) {
|
|
var cb = done()
|
|
db.isread.get(k, function (err, v) { cb(null, !!v) })
|
|
})
|
|
done(cb)
|
|
} else {
|
|
db.isread.get(key, function (err, v) {
|
|
cb && cb(null, !!v)
|
|
})
|
|
}
|
|
}
|
|
|
|
api.subscribe = function (key, cb) {
|
|
db.subscribed.put(key, 1, cb)
|
|
}
|
|
api.unsubscribe = function (key, cb) {
|
|
db.subscribed.del(key, cb)
|
|
}
|
|
api.toggleSubscribed = function (key, cb) {
|
|
api.isSubscribed(key, function (err, v) {
|
|
if (!v) {
|
|
api.subscribe(key, function (err) {
|
|
cb(err, true)
|
|
})
|
|
} else {
|
|
api.unsubscribe(key, function (err) {
|
|
cb(err, false)
|
|
})
|
|
}
|
|
})
|
|
}
|
|
api.isSubscribed = function (key, cb) {
|
|
db.subscribed.get(key, function (err, v) {
|
|
cb && cb(null, !!v)
|
|
})
|
|
}
|
|
|
|
api.addFileToBlobs = function (path, cb) {
|
|
pull(
|
|
toPull.source(fs.createReadStream(path)),
|
|
sbot.blobs.add(function (err, hash) {
|
|
if (err)
|
|
cb(err)
|
|
else {
|
|
var ext = pathlib.extname(path)
|
|
if (ext == '.png' || ext == '.jpg' || ext == '.jpeg') {
|
|
var res = getImgDim(path)
|
|
res.hash = hash
|
|
cb(null, res)
|
|
} else
|
|
cb(null, { hash: hash })
|
|
}
|
|
})
|
|
)
|
|
}
|
|
api.saveBlobToFile = function (hash, path, cb) {
|
|
pull(
|
|
sbot.blobs.get(hash),
|
|
toPull.sink(fs.createWriteStream(path), cb)
|
|
)
|
|
}
|
|
function getImgDim (path) {
|
|
var NativeImage = require('native-image')
|
|
var ni = NativeImage.createFromPath(path)
|
|
return ni.getSize()
|
|
}
|
|
|
|
var lookupcodeRegex = /(@[a-z0-9\/\+\=]+\.[a-z0-9]+)(?:\[via\])?(.+)?/i
|
|
api.useLookupCode = function (code) {
|
|
var eventPush = pushable()
|
|
|
|
// parse and validate the code
|
|
var id, addrs
|
|
var parts = lookupcodeRegex.exec(code)
|
|
var valid = true
|
|
if (parts) {
|
|
id = parts[1]
|
|
addrs = (parts[2]) ? parts[2].split(',') : []
|
|
|
|
// validate id
|
|
if (!ref.isFeedId(id))
|
|
valid = false
|
|
|
|
// parse addresses
|
|
addrs = addrs
|
|
.map(function (addr) {
|
|
addr = addr.split(':')
|
|
if (addr.length === 3)
|
|
return { host: addr[0], port: +addr[1], key: addr[2] }
|
|
})
|
|
.filter(Boolean)
|
|
} else
|
|
valid = false
|
|
|
|
if (!valid) {
|
|
eventPush.push({ type: 'error', message: 'Invalid lookup code' })
|
|
eventPush.end()
|
|
return eventPush
|
|
}
|
|
|
|
// begin the search!
|
|
search(addrs.concat(sbot.gossip.peers()))
|
|
function search (peers) {
|
|
var peer = peers.pop()
|
|
if (!peer)
|
|
return eventPush.end()
|
|
|
|
// connect to the peer
|
|
eventPush.push({ type: 'connecting', addr: peer })
|
|
sbot.connect(peer, function (err, rpc) {
|
|
if (err) {
|
|
eventPush.push({ type: 'error', message: 'Failed to connect', err: err })
|
|
return search(peers)
|
|
}
|
|
// try a sync
|
|
sync(rpc, function (err, seq) {
|
|
if (seq > 0) {
|
|
// success!
|
|
eventPush.push({ type: 'finished', seq: seq })
|
|
eventPush.end()
|
|
} else
|
|
search(peers) // try next
|
|
})
|
|
})
|
|
}
|
|
|
|
function sync (rpc, cb) {
|
|
// fetch the feed
|
|
var seq
|
|
eventPush.push({ type: 'syncing', id: id })
|
|
pull(
|
|
rpc.createHistoryStream({ id: id, keys: false }),
|
|
pull.through(function (msg) {
|
|
seq = msg.sequence
|
|
}),
|
|
sbot.createWriteStream(function (err) {
|
|
cb(err, seq)
|
|
})
|
|
)
|
|
}
|
|
|
|
return eventPush
|
|
}
|
|
|
|
api.getSite = function (id, cb) {
|
|
awaitSync(function () { cb(null, state.sites[id]) })
|
|
}
|
|
|
|
var sitePathRegex = /(@.*\.ed25519)(.*)/
|
|
api.getSiteLink = function (url, cb) {
|
|
awaitSync(function () {
|
|
// parse url
|
|
var parts = sitePathRegex.exec(url)
|
|
if (!parts) {
|
|
var err = new Error('Not found')
|
|
err.notFound = true
|
|
return cb(err)
|
|
}
|
|
|
|
var pid = parts[1]
|
|
var path = parts[2]
|
|
if (path.charAt(0) == '/')
|
|
path = path.slice(1) // skip the preceding slash
|
|
if (!path)
|
|
path = 'index.html' // default asset
|
|
|
|
// lookup the link
|
|
var link = (state.sites[pid]) ? state.sites[pid][path] : null
|
|
if (!link) {
|
|
var err = new Error('Not found')
|
|
err.notFound = true
|
|
return cb(err)
|
|
}
|
|
cb(null, link)
|
|
})
|
|
}
|
|
|
|
api.getProfile = function (id, cb) {
|
|
awaitSync(function () { cb(null, state.profiles[id]) })
|
|
}
|
|
api.getAllProfiles = function (cb) {
|
|
awaitSync(function () { cb(null, state.profiles) })
|
|
}
|
|
api.getNamesById = function (cb) {
|
|
awaitSync(function () { cb(null, state.names) })
|
|
}
|
|
api.getName = function (id, cb) {
|
|
awaitSync(function () { cb(null, state.names[id]) })
|
|
}
|
|
api.getIdsByName = function (cb) {
|
|
awaitSync(function () { cb(null, state.ids) })
|
|
}
|
|
api.getActionItems = function (cb) {
|
|
awaitSync(function () { cb(null, state.actionItems) })
|
|
}
|
|
|
|
// helper to get an option off an opt function (avoids the `opt || {}` pattern)
|
|
function o (opts, k, def) {
|
|
return opts && opts[k] !== void 0 ? opts[k] : def
|
|
}
|
|
|
|
// helper to get messages from an index
|
|
function indexStreamFn (index, getkey) {
|
|
return function (opts) {
|
|
// emulate the `ssb.createFeedStream` interface
|
|
var lt = o(opts, 'lt')
|
|
var lte = o(opts, 'lte')
|
|
var gt = o(opts, 'gt')
|
|
var gte = o(opts, 'gte')
|
|
var limit = o(opts, 'limit')
|
|
|
|
// lt, lte, gt, gte should look like:
|
|
// [msg.value.timestamp, msg.value.author]
|
|
|
|
// helper to create emittable rows
|
|
function lookup (row) {
|
|
if (!row) return
|
|
var key = (getkey) ? getkey(row) : row.key
|
|
if (key) {
|
|
var rowcopy = { key: key }
|
|
for (var k in row) { // copy index attrs into rowcopy
|
|
if (!rowcopy[k]) rowcopy[k] = row[k]
|
|
}
|
|
return rowcopy
|
|
}
|
|
}
|
|
|
|
// helper to fetch rows
|
|
function fetch (row, cb) {
|
|
sbot.get(row.key, function (err, value) {
|
|
// if (err) {
|
|
// suppress this error
|
|
// the message isnt in the local cache (yet)
|
|
// but it got into the index, likely due to a link
|
|
// instead of an error, we'll put a null there to indicate the gap
|
|
// }
|
|
row.value = value
|
|
cb(null, row)
|
|
})
|
|
}
|
|
|
|
// readstream
|
|
var readPush = pushable()
|
|
var read = pull(readPush, paramap(fetch))
|
|
|
|
// await sync, then emit the reads
|
|
awaitSync(function () {
|
|
var added = 0
|
|
for (var i=0; i < index.rows.length; i++) {
|
|
var row = index.rows[i]
|
|
|
|
if (limit && added >= limit)
|
|
break
|
|
|
|
// we're going to only look at timestamp, because that's all that phoenix cares about
|
|
var invalid = !!(
|
|
(lt && row.ts >= lt[0]) ||
|
|
(lte && row.ts > lte[0]) ||
|
|
(gt && row.ts <= gt[0]) ||
|
|
(gte && row.ts < gte[0])
|
|
)
|
|
if (invalid)
|
|
continue
|
|
|
|
var r = lookup(row)
|
|
if (r) {
|
|
readPush.push(r)
|
|
added++
|
|
}
|
|
}
|
|
readPush.end()
|
|
})
|
|
|
|
if (opts && opts.live) {
|
|
// live stream, concat the live-emitter on the end
|
|
index.on('add', onadd)
|
|
var livePush = pushable(function () { index.removeListener('add', onadd) })
|
|
function onadd (row) { livePush.push(lookup(row)) }
|
|
var live = pull(livePush, paramap(fetch))
|
|
return cat([read, live])
|
|
}
|
|
return read
|
|
}
|
|
}
|
|
|
|
return api
|
|
} |