var fs = require('fs') var pull = require('pull-stream') var multicb = require('multicb') var pl = require('pull-level') var pushable = require('pull-pushable') var paramap = require('pull-paramap') var cat = require('pull-cat') var Notify = require('pull-notify') var toPull = require('stream-to-pull-stream') var ref = require('ssb-ref') var pathlib = require('path') var u = require('./util') exports.name = 'patchwork' exports.version = '1.0.0' exports.manifest = require('./manifest') exports.permissions = require('./permissions') exports.init = function (sbot, opts) { var api = {} var phoenixdb = sbot.sublevel('patchwork') var db = { isread: phoenixdb.sublevel('isread'), subscribed: phoenixdb.sublevel('subscribed') } var state = { // indexes (lists of {key:, ts:}) mymsgs: [], home: u.index(), // also has `.isread` inbox: u.index(), // also has `.isread` and `.author` votes: u.index(), // also has `.isread`, `.vote`, and `.votemsg` myvotes: u.index(), // also has `.vote` follows: u.index(), // also has `.isread` and `.following` // views profiles: {}, sites: {}, names: {}, // ids -> names ids: {}, // names -> ids actionItems: {} } var processor = require('./processor')(sbot, db, state, emit) pull(pl.read(sbot.sublevel('log'), { live: true, onSync: onPrehistorySync }), pull.drain(processor)) // track sync state // - processor does async processing for each message that comes in // - awaitSync() waits for that processing to finish // - pinc() on message arrival, pdec() on message processed // - nP === 0 => all messages processed var nP = 0, syncCbs = [] function awaitSync (cb) { if (nP > 0) syncCbs.push(cb) else cb() } state.pinc = function () { nP++ } state.pdec = function () { nP-- if (nP === 0) { syncCbs.forEach(function (cb) { cb() }) syncCbs.length = 0 } } var isPreHistorySynced = false // track so we dont emit events for old messages // grab for history sync state.pinc() function onPrehistorySync () { console.log('Log history read...') // when all current items finish, consider prehistory synced (and start emitting) awaitSync(function () { console.log('Indexes generated') isPreHistorySynced = true }) // release state.pdec() } // events stream var notify = Notify() function emit (type, data) { if (!isPreHistorySynced) return var e = data || {} e.type = type if (e.type == 'index-change') { api.getIndexCounts(function (err, counts) { e.total = counts[e.index] e.unread = counts[e.index+'Unread'] notify(e) }) } else notify(e) } // getters api.createEventStream = function () { return notify.listen() } api.getPaths = function (cb) { cb(null, { site: pathlib.join(opts.path, 'publish') }) } api.getMyProfile = function (cb) { awaitSync(function () { api.getProfile(sbot.id, cb) }) } function isInboxFriend (row) { if (row.author == sbot.id) return true var p = state.profiles[sbot.id] if (!p) return false return p.assignedTo[row.author] && p.assignedTo[row.author].following } api.getIndexCounts = function (cb) { awaitSync(function () { cb(null, { inbox: state.inbox.rows.filter(isInboxFriend).length, inboxUnread: state.inbox.filter(function (row) { return isInboxFriend(row) && row.author != sbot.id && !row.isread }).length, votes: state.votes.filter(function (row) { return row.vote > 0 }).length, votesUnread: state.votes.filter(function (row) { return row.vote > 0 && !row.isread }).length, follows: state.follows.filter(function (row) { return row.following }).length, followsUnread: state.follows.filter(function (row) { return row.following && !row.isread }).length, home: state.home.rows.length }) }) } api.createInboxStream = indexStreamFn(state.inbox, function (row) { if (!isInboxFriend(row)) return false return row.key }) api.createVoteStream = indexStreamFn(state.votes, function (row) { if (row.vote <= 0) return false return row.votemsg }) api.createMyvoteStream = indexStreamFn(state.myvotes, function (row) { if (row.vote <= 0) return false return row.key }) api.createFollowStream = indexStreamFn(state.follows) api.createHomeStream = indexStreamFn(state.home) function indexMarkRead (indexname, key, keyname) { if (Array.isArray(key)) { key.forEach(function (k) { indexMarkRead(indexname, k, keyname) }) return } var index = state[indexname] var row = index.find(key, keyname) if (row) { var wasread = row.isread row.isread = true if (!wasread) emit('index-change', { index: indexname }) } } function indexMarkUnread (indexname, key, keyname) { if (Array.isArray(key)) { key.forEach(function (k) { indexMarkUnread(indexname, k, keyname) }) return } var index = state[indexname] var row = index.find(key, keyname) if (row) { var wasread = row.isread row.isread = false if (wasread) emit('index-change', { index: indexname }) } } api.markRead = function (key, cb) { indexMarkRead('inbox', key) indexMarkRead('votes', key, 'votemsg') indexMarkRead('follows', key) if (Array.isArray(key)) db.isread.batch(key.map(function (k) { return { type: 'put', key: k, value: 1 }}), cb) else db.isread.put(key, 1, cb) } api.markUnread = function (key, cb) { indexMarkUnread('inbox', key) indexMarkUnread('votes', key, 'votemsg') indexMarkUnread('follows', key) if (Array.isArray(key)) db.isread.batch(key.map(function (k) { return { type: 'del', key: k }}), cb) else db.isread.del(key, cb) } api.toggleRead = function (key, cb) { api.isRead(key, function (err, v) { if (!v) { api.markRead(key, function (err) { cb(err, true) }) } else { api.markUnread(key, function (err) { cb(err, false) }) } }) } api.isRead = function (key, cb) { if (Array.isArray(key)) { var done = multicb({ pluck: 1 }) key.forEach(function (k, i) { var cb = done() db.isread.get(k, function (err, v) { cb(null, !!v) }) }) done(cb) } else { db.isread.get(key, function (err, v) { cb && cb(null, !!v) }) } } api.subscribe = function (key, cb) { db.subscribed.put(key, 1, cb) } api.unsubscribe = function (key, cb) { db.subscribed.del(key, cb) } api.toggleSubscribed = function (key, cb) { api.isSubscribed(key, function (err, v) { if (!v) { api.subscribe(key, function (err) { cb(err, true) }) } else { api.unsubscribe(key, function (err) { cb(err, false) }) } }) } api.isSubscribed = function (key, cb) { db.subscribed.get(key, function (err, v) { cb && cb(null, !!v) }) } api.addFileToBlobs = function (path, cb) { pull( toPull.source(fs.createReadStream(path)), sbot.blobs.add(function (err, hash) { if (err) cb(err) else { var ext = pathlib.extname(path) if (ext == '.png' || ext == '.jpg' || ext == '.jpeg') { var res = getImgDim(path) res.hash = hash cb(null, res) } else cb(null, { hash: hash }) } }) ) } api.saveBlobToFile = function (hash, path, cb) { pull( sbot.blobs.get(hash), toPull.sink(fs.createWriteStream(path), cb) ) } function getImgDim (path) { var NativeImage = require('native-image') var ni = NativeImage.createFromPath(path) return ni.getSize() } var lookupcodeRegex = /(@[a-z0-9\/\+\=]+\.[a-z0-9]+)(?:\[via\])?(.+)?/i api.useLookupCode = function (code) { var eventPush = pushable() // parse and validate the code var id, addrs var parts = lookupcodeRegex.exec(code) var valid = true if (parts) { id = parts[1] addrs = (parts[2]) ? parts[2].split(',') : [] // validate id if (!ref.isFeedId(id)) valid = false // parse addresses addrs = addrs .map(function (addr) { addr = addr.split(':') if (addr.length === 3) return { host: addr[0], port: +addr[1], key: addr[2] } }) .filter(Boolean) } else valid = false if (!valid) { eventPush.push({ type: 'error', message: 'Invalid lookup code' }) eventPush.end() return eventPush } // begin the search! search(addrs.concat(sbot.gossip.peers())) function search (peers) { var peer = peers.pop() if (!peer) return eventPush.end() // connect to the peer eventPush.push({ type: 'connecting', addr: peer }) sbot.connect(peer, function (err, rpc) { if (err) { eventPush.push({ type: 'error', message: 'Failed to connect', err: err }) return search(peers) } // try a sync sync(rpc, function (err, seq) { if (seq > 0) { // success! eventPush.push({ type: 'finished', seq: seq }) eventPush.end() } else search(peers) // try next }) }) } function sync (rpc, cb) { // fetch the feed var seq eventPush.push({ type: 'syncing', id: id }) pull( rpc.createHistoryStream({ id: id, keys: false }), pull.through(function (msg) { seq = msg.sequence }), sbot.createWriteStream(function (err) { cb(err, seq) }) ) } return eventPush } api.getSite = function (id, cb) { awaitSync(function () { cb(null, state.sites[id]) }) } var sitePathRegex = /(@.*\.ed25519)(.*)/ api.getSiteLink = function (url, cb) { awaitSync(function () { // parse url var parts = sitePathRegex.exec(url) if (!parts) { var err = new Error('Not found') err.notFound = true return cb(err) } var pid = parts[1] var path = parts[2] if (path.charAt(0) == '/') path = path.slice(1) // skip the preceding slash if (!path) path = 'index.html' // default asset // lookup the link var link = (state.sites[pid]) ? state.sites[pid][path] : null if (!link) { var err = new Error('Not found') err.notFound = true return cb(err) } cb(null, link) }) } api.getProfile = function (id, cb) { awaitSync(function () { cb(null, state.profiles[id]) }) } api.getAllProfiles = function (cb) { awaitSync(function () { cb(null, state.profiles) }) } api.getNamesById = function (cb) { awaitSync(function () { cb(null, state.names) }) } api.getName = function (id, cb) { awaitSync(function () { cb(null, state.names[id]) }) } api.getIdsByName = function (cb) { awaitSync(function () { cb(null, state.ids) }) } api.getActionItems = function (cb) { awaitSync(function () { cb(null, state.actionItems) }) } // helper to get an option off an opt function (avoids the `opt || {}` pattern) function o (opts, k, def) { return opts && opts[k] !== void 0 ? opts[k] : def } // helper to get messages from an index function indexStreamFn (index, getkey) { return function (opts) { // emulate the `ssb.createFeedStream` interface var lt = o(opts, 'lt') var lte = o(opts, 'lte') var gt = o(opts, 'gt') var gte = o(opts, 'gte') var limit = o(opts, 'limit') // lt, lte, gt, gte should look like: // [msg.value.timestamp, msg.value.author] // helper to create emittable rows function lookup (row) { if (!row) return var key = (getkey) ? getkey(row) : row.key if (key) { var rowcopy = { key: key } for (var k in row) { // copy index attrs into rowcopy if (!rowcopy[k]) rowcopy[k] = row[k] } return rowcopy } } // helper to fetch rows function fetch (row, cb) { sbot.get(row.key, function (err, value) { // if (err) { // suppress this error // the message isnt in the local cache (yet) // but it got into the index, likely due to a link // instead of an error, we'll put a null there to indicate the gap // } row.value = value cb(null, row) }) } // readstream var readPush = pushable() var read = pull(readPush, paramap(fetch)) // await sync, then emit the reads awaitSync(function () { var added = 0 for (var i=0; i < index.rows.length; i++) { var row = index.rows[i] if (limit && added >= limit) break // we're going to only look at timestamp, because that's all that phoenix cares about var invalid = !!( (lt && row.ts >= lt[0]) || (lte && row.ts > lte[0]) || (gt && row.ts <= gt[0]) || (gte && row.ts < gte[0]) ) if (invalid) continue var r = lookup(row) if (r) { readPush.push(r) added++ } } readPush.end() }) if (opts && opts.live) { // live stream, concat the live-emitter on the end index.on('add', onadd) var livePush = pushable(function () { index.removeListener('add', onadd) }) function onadd (row) { livePush.push(lookup(row)) } var live = pull(livePush, paramap(fetch)) return cat([read, live]) } return read } } return api }