[Xapian-discuss] Multithread problem: Writing to a db disables
reading from another one
Robert Pollak
robert.pollak at fabasoft.com
Tue Jul 13 12:32:25 BST 2004
Hi to all,
I have Xapian 0.8.1 installed on a single processor RH9 machine with a custom 2.4.20 kernel.
(I used ./configure without parameters.)
I have followed the recent "threaded test" discussion here, and I have tested the examples:
- Richard Boulton's example with one database, one writer, and three readers (1)
works fine, I only see the expected DatabaseModifiedError.
- Eric B. Ridge's example with just two writers to separate databases (2)
also shows no errors (I stopped after 162 docs were added).
I then expanded Richard's example by adding a second writer that writes to a separate database (see attachment).
(I am also reopening the db if necessary.)
Now the queries give empty results!
Commenting out the creation of _writer2 makes the queries return the expected results again (3).
I wonder whether Olly or someone else can reproduce this.
I have not tried modifying any Xapian compile options, because I am an automake newbie.
Would I have to edit Makefile.in (and how whould I do this), or would I just pass arguments to configure?
Footnotes:
1: from 17 Jun 2004 14:25:37 +0100, see <http://article.gmane.org/gmane.comp.search.xapian.general/892>
2: from 17 Jun 2004 00:30:50 +0000, see <http://article.gmane.org/gmane.comp.search.xapian.general/876>
3:
$ ./xapian_threads
Writer opening 1.db
Writer flushing 1.db
Reader opening 1.db
1.db results: 10
Reader opening 1.db
1.db results: 10
Reader opening 1.db
1.db results: 10
Readers started
Writer flushing 1.db
1.db results: 10
1.db results: 10
1.db results: 10
Writer flushing 1.db
Reader reopening 1.db
1.db results: 30
Reader reopening 1.db
1.db results: 30
Reader reopening 1.db
1.db results: 30
1.db results: 30
1.db results: 30
1.db results: 30
Writer flushing 1.db
1.db results: 30
1.db results: 30
1.db results: 30
Reader reopening 1.db
1.db results: 40
--
Robert Pollak
GPG Key ID: 748646AD
-------------- next part --------------
#include <xapian.h>
#include <pthread.h>
#include <unistd.h>
#include <iostream>
#include <sstream>
#define MAX_THREADS 3
using namespace std;
pthread_t _writer1, _writer2;
pthread_t _readers[MAX_THREADS];
pthread_mutex_t output_mutex;
void msg(const string& msg)
{
pthread_mutex_lock(&output_mutex);
cout << msg << endl;
pthread_mutex_unlock(&output_mutex);
}
void *do_writes(void *param)
{
char *dbpath = (char *)param;
msg(string("Writer opening ") + dbpath);
unlink((string(dbpath) + "/db_lock").c_str());
Xapian::WritableDatabase db = Xapian::Auto::open(dbpath, Xapian::DB_CREATE_OR_OVERWRITE);
char *rnd = (char *)malloc(255);
memset(rnd, 0, 255);
long cnt = 0;
try {
// endlessly add a 2000 term document to the database
while(true) {
Xapian::Document doc;
int i = 0;
for(; i < 1000; i++) {
doc.add_posting("random", i);
sprintf(rnd, "%d", rand());
doc.add_posting(rnd, i);
}
db.add_document(doc);
if(++cnt % 10 == 0) {
// flush every 10 documents
db.flush();
msg(string("Writer flushing ") + dbpath);
}
}
}
catch(Xapian::Error & err) {
msg(string("Writer accessing ") + dbpath + ": ERROR=" + err.get_msg());
}
msg("Writer thread unexpectedly exited...");
pthread_exit(NULL);
}
void *do_reads(void *param)
{
char *dbpath = (char *)param;
msg(string("Reader opening ") + dbpath);
try {
// create the reader database once and continue to query it.
Xapian::Database db = Xapian::Auto::open(dbpath);
while(true) {
Xapian::Enquire enq(db);
Xapian::Query query("random");
Xapian::MSet set;
enq.set_query(query);
bool dbModified;
do {
dbModified = false;
try {
set = enq.get_mset(0, 2500);
// walk the results to exercize the mset iterator
for(Xapian::MSetIterator i = set.begin(); i != set.end(); ++i);
}
catch(const Xapian::DatabaseModifiedError &) {
dbModified = true;
msg(string("Reader reopening ") + dbpath);
db.reopen();
}
} while(dbModified); // repeat only on error
stringstream result;
result << dbpath << " results: " << set.size();
msg(result.str());
sleep(1); // make the log more readable
}
}
catch(Xapian::Error & err) {
msg(string("Reader accessing ") + dbpath + ": ERROR=" + err.get_msg());
}
pthread_exit(NULL);
}
void setup_writer_threads()
{
pthread_create(&_writer1, NULL, do_writes, (void *)"1.db");
pthread_create(&_writer2, NULL, do_writes, (void *)"2.db");
}
void setup_reader_threads()
{
int i = 0;
for(; i < MAX_THREADS; i++) {
int success = pthread_create(&(_readers[i]), NULL, do_reads, (void *)"1.db") == 0;
if(!success) {
msg("setup_reader_threads: Could not create thread");
}
}
}
int main(int argv, char **argc)
{
int i = 0;
pthread_mutex_init(&output_mutex, NULL);
setup_writer_threads();
// let the writer threads get going before we start a bunch of readers
sleep(2);
setup_reader_threads();
msg("Readers started");
// sit and wait for everything to finish
// in a perfect world, they'll never finish
pthread_join(_writer1, NULL);
pthread_join(_writer2, NULL);
for(; i < MAX_THREADS; i++) {
pthread_join(_readers[i], NULL);
}
pthread_mutex_destroy(&output_mutex);
return 0;
}
More information about the Xapian-discuss
mailing list