class HashDB { ... public: bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* checker = NULL) { _assert_(visitor && thnum <= MEMMAXSIZ); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } if ((int64_t)thnum > bnum_) thnum = bnum_; ScopedVisitor svis(visitor); bool err = false; if (!scan_parallel_impl(visitor, thnum, checker)) err = true; trigger_meta(MetaTrigger::ITERATE, "iterate"); return !err; } private: bool scan_parallel_impl(Visitor *visitor, size_t thnum, ProgressChecker* checker) { int64_t allcnt = count_; if (checker && !checker->check("iterate", "beginning", -1, allcnt)) { set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); return false; } double range = (double)bnum_ / thnum; class ThreadImpl : public Thread { public: explicit ThreadImpl() : db_(NULL), visitor_(NULL), checker_(NULL), begidx_(0), endidx_(0), error_(false) {} void init(HashDB* db, Visitor* visitor, ProgressChecker* checker, int64_t allcnt, int64_t begidx, int64_t endidx) { db_ = db; visitor_ = visitor; checker_ = checker; allcnt_ = allcnt; begidx_ = begidx; endidx_ = endidx; } bool error() { return error_; } private: void run() { int64_t eidx = endidx_; for (int64_t idx = begidx_; idx < eidx; idx++) { size_t lidx = idx % RLOCKSLOT; db_->rlock_.lock_reader(lidx); // reader lock for each record !! int64_t top = db_->get_bucket(idx); std::stack stack; if (top > 0) stack.push(top); while (!stack.empty()) { int64_t off = stack.top(); stack.pop(); Record rec; char rbuf[RECBUFSIZ]; rec.off = off; if (!db_->read_record(&rec, rbuf)) { error_ = true; break; } if (!rec.vbuf && !db_->read_record_body(&rec)) { delete[] rec.bbuf; error_ = true; break; } const char* vbuf = rec.vbuf; size_t vsiz = rec.vsiz; char* zbuf = NULL; size_t zsiz = 0; if (db_->comp_) { zbuf = db_->comp_->decompress(vbuf, vsiz, &zsiz); if (!zbuf) { db_->set_error(_KCCODELINE_, Error::SYSTEM, "data decompression failed"); delete[] rec.bbuf; error_ = true; break; } vbuf = zbuf; vsiz = zsiz; } visitor_->visit_full(rec.kbuf, rec.ksiz, vbuf, vsiz, &vsiz); delete[] zbuf; delete[] rec.bbuf; if (rec.left > 0) stack.push(rec.left); if (rec.right > 0) stack.push(rec.right); if (checker_ && !checker_->check("iterate", "processing", -1, allcnt_)) { db_->set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); error_ = true; break; } } db_->rlock_.unlock(lidx); } } HashDB* db_; Visitor* visitor_; ProgressChecker* checker_; int64_t allcnt_; int64_t begidx_; int64_t endidx_; bool error_; }; ThreadImpl* threads = new ThreadImpl[thnum]; bool err = false; for (size_t i = 0; i < thnum; i++) { int64_t begidx = i * range; int64_t endidx = (i + 1) * range; if (i == thnum - 1) endidx = bnum_; ThreadImpl* thread = threads + i; thread->init(this, visitor, checker, allcnt, begidx, endidx); thread->start(); } for (size_t i = 0; i < thnum; i++) { ThreadImpl* thread = threads + i; if (thread->error()) err = true; thread->join(); } if (checker && !checker->check("iterate", "ending", -1, allcnt)) { set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); err = true; } delete[] threads; return !err; } ... };