* threaded parser
[m6w6/ext-psi] / src / context.c
index ecccdb0546d88e16bc8122fb68eb5a820e38106f..4dcd92092f6cecd65ac69e872741cdc7f3a7eb92 100644 (file)
 # endif
 #endif
 
+#include <unistd.h>
 #include <fnmatch.h>
 
+#if PSI_THREADED_PARSER
+# include <pthread.h>
+#endif
+
 #include "php_scandir.h"
 #include "php_psi.h"
 #include "calc.h"
@@ -129,14 +134,6 @@ struct psi_context *psi_context_init(struct psi_context *C, struct psi_context_o
        return C;
 }
 
-static int psi_select_dirent(const struct dirent *entry)
-{
-#ifndef FNM_CASEFOLD
-# define FNM_CASEFOLD 0
-#endif
-       return 0 == fnmatch("*.psi", entry->d_name, FNM_CASEFOLD);
-}
-
 static bool psi_context_add(struct psi_context *C, struct psi_parser *P)
 {
        bool valid;
@@ -154,61 +151,214 @@ static bool psi_context_add(struct psi_context *C, struct psi_parser *P)
        return valid;
 }
 
+struct psi_context_build_worker {
+       pthread_t tid;
+       struct psi_parser parser;
+       struct psi_parser_input *input;
+       char psi_file[MAXPATHLEN];
+};
+
+static struct psi_context_build_worker *psi_context_build_worker_init(
+               struct psi_context *C, const char *dir, const char *file)
+{
+       struct psi_context_build_worker *w = pecalloc(1, sizeof(*w), 1);
+
+       if (MAXPATHLEN <= slprintf(w->psi_file, MAXPATHLEN, "%s/%s", dir, file)) {
+               C->error(PSI_DATA(C), NULL, PSI_WARNING, "Path to PSI file too long: %s/%s",
+                       dir, file);
+               pefree(w, 1);
+               return NULL;
+       }
+       if (!psi_parser_init(&w->parser, C->error, C->flags)) {
+               C->error(PSI_DATA(C), NULL, PSI_WARNING, "Failed to init PSI parser (%s): %s",
+                       w->psi_file, strerror(errno));
+               pefree(w, 1);
+               return NULL;
+       }
+       return w;
+}
+
+#if PSI_THREADED_PARSER
+static void *psi_context_build_worker_thread(void *thread_ptr)
+{
+       struct psi_context_build_worker *thread = thread_ptr;
+       psi_parser_parse(&thread->parser, thread->input);
+       return NULL;
+}
+
+static bool psi_context_build_worker_thread_start(
+               struct psi_context_build_worker *w)
+{
+       unsigned tries = 0;
+       int rc;
+
+again: ;
+       rc = pthread_create(&w->tid, NULL, psi_context_build_worker_thread, w);
+
+       switch (rc) {
+       case 0:
+               return true;
+       case EAGAIN:
+               if (tries++ < 10) {
+                       goto again;
+               }
+               /* no break */
+       default:
+               w->parser.error(PSI_DATA(&w->parser), NULL, PSI_WARNING,
+                               "Failed to start parser thread: %s", strerror(rc));
+               w->tid = 0;
+               return false;
+       }
+}
+#endif
+
+static bool psi_context_build_worker_exec(struct psi_context_build_worker *w)
+{
+       if (!(w->input = psi_parser_open_file(&w->parser, w->psi_file, true))) {
+               w->parser.error(PSI_DATA(&w->parser), NULL, PSI_WARNING,
+                               "Failed to open PSI file (%s): %s", w->psi_file, strerror(errno));
+               return false;
+       }
+#if PSI_THREADED_PARSER
+       return psi_context_build_worker_thread_start(w);
+#else
+       return psi_parser_parse(&w->parser, w->input);
+#endif
+}
+
+static bool psi_context_build_worker_done(struct psi_context_build_worker *w)
+{
+#if PSI_THREADED_PARSER
+       void *rval = NULL;
+
+       if (!w->tid) {
+               return true;
+       }
+
+# if HAVE_PTHREAD_TRYJOIN_NP
+       if (0 == pthread_tryjoin_np(w->tid, &rval)) {
+               w->tid = 0;
+               return true;
+       }
+# else
+       if (0 == pthread_join(w->tid, &rval)) {
+               w->tid = 0;
+               return true;
+       }
+# endif
+       return false;
+#else
+       return true;
+#endif
+}
+
+static void psi_context_build_worker_dtor(struct psi_context_build_worker *w)
+{
+#if PSI_THREADED_PARSER
+       if (w->tid) {
+               void *rval;
+               int rc = pthread_join(w->tid, &rval);
+
+               if (rc) {
+                       w->parser.error(PSI_DATA(&w->parser), NULL, PSI_WARNING,
+                                       "Failed to finish parser thread: %s", strerror(errno));
+               }
+       }
+#endif
+       psi_parser_input_free(&w->input);
+       psi_parser_dtor(&w->parser);
+}
+
+static void psi_context_build_worker_free(struct psi_context_build_worker **w)
+{
+       if (*w) {
+               psi_context_build_worker_dtor(*w);
+               pefree(*w, 1);
+               *w = NULL;
+       }
+}
+
+static int psi_select_dirent(const struct dirent *entry)
+{
+#ifndef FNM_CASEFOLD
+# define FNM_CASEFOLD 0
+#endif
+       return 0 == fnmatch("*.psi", entry->d_name, FNM_CASEFOLD);
+}
+
 void psi_context_build(struct psi_context *C, const char *paths)
 {
-       int i, n;
        char *sep = NULL, *cpy = strdup(paths), *ptr = cpy;
-       struct dirent **entries;
+       struct psi_context_build_worker *worker;
+       struct psi_plist *workers = psi_plist_init(
+                       (psi_plist_dtor) psi_context_build_worker_free);
 
        do {
-               sep = strchr(ptr, ':');
+               struct dirent **entries = NULL;
+               int i, n;
 
-               if (sep) {
+               if ((sep = strchr(ptr, ':'))) {
                        *sep = 0;
                }
 
-               entries = NULL;
                n = php_scandir(ptr, &entries, psi_select_dirent, alphasort);
 
-               if (n > 0) {
+               if (n < 0) {
+                       C->error(PSI_DATA(C), NULL, PSI_WARNING,
+                                       "Failed to scan PSI directory '%s':%s", strerror(errno));
+               } else {
                        for (i = 0; i < n; ++i) {
-                               char psi[MAXPATHLEN];
-                               struct psi_parser P;
-                               struct psi_parser_input *I;
-
-                               if (MAXPATHLEN <= slprintf(psi, MAXPATHLEN, "%s/%s", ptr, entries[i]->d_name)) {
-                                       C->error(PSI_DATA(C), NULL, PSI_WARNING, "Path to PSI file too long: %s/%s",
-                                               ptr, entries[i]->d_name);
-                               }
-                               if (!psi_parser_init(&P, C->error, C->flags)) {
-                                       C->error(PSI_DATA(C), NULL, PSI_WARNING, "Failed to init PSI parser (%s): %s",
-                                               psi, strerror(errno));
-                                       continue;
-                               }
-                               if (!(I = psi_parser_open_file(&P, psi, true))) {
-                                       C->error(PSI_DATA(C), NULL, PSI_WARNING, "Failed to open PSI file (%s): %s",
-                                               psi, strerror(errno));
-                                       continue;
+                               worker = psi_context_build_worker_init(C, ptr, entries[i]->d_name);
+                               if (worker) {
+                                       workers = psi_plist_add(workers, &worker);
                                }
-                               psi_parser_parse(&P, I);
-                               psi_context_add(C, &P);
-                               psi_parser_dtor(&P);
-                               psi_parser_input_free(&I);
-                       }
-               }
-
-               if (entries) {
-                       for (i = 0; i < n; ++i) {
                                free(entries[i]);
                        }
                        free(entries);
                }
-
                ptr = sep + 1;
        } while (sep);
 
        free(cpy);
 
+       if (psi_plist_count(workers)) {
+               struct psi_plist *running = psi_plist_init(
+                               (psi_plist_dtor) psi_context_build_worker_free);
+               long active = 0;
+#ifdef _SC_NPROCESSORS_ONLN
+               long pool = sysconf(_SC_NPROCESSORS_ONLN);
+#else
+               long pool = 4;
+#endif
+
+               while (psi_plist_count(workers) && active < pool) {
+                       if (psi_plist_pop(workers, &worker)) {
+                               if (psi_context_build_worker_exec(worker)) {
+                                       running = psi_plist_add(running, &worker);
+                                       ++active;
+                               }
+                       }
+               }
+               while (active) {
+                       size_t i = 0;
+
+                       while (psi_plist_get(running, i++, &worker)) {
+                               if (psi_context_build_worker_done(worker)) {
+                                       psi_context_add(C, &worker->parser);
+
+                                       psi_plist_del(running, --i, NULL);
+                                       psi_context_build_worker_free(&worker);
+
+                                       if (psi_plist_pop(workers, &worker)) {
+                                               psi_plist_add(running, &worker);
+                                       } else {
+                                               --active;
+                                       }
+                               }
+                       }
+               }
+       }
+
        psi_context_compile(C);
 }