Running multiple python scripts from a single script, and communicating back and forth between them?












1















I have a script that I wrote that I am able to pass arguments to, and I want launch multiple simultaneous iterations (maybe 100+) with unique arguments. My plan was to write another python script which then launch these subscripts/processes, however to be effective, I need the that script to be able to monitor the subscripts for any errors.



Is there any straightforward way to do this, or a library that offers this functionality? I've been searching for a while and am not having good luck finding anything. Creating subprocesses and multiple threads seems straight forward enough but I can't really find any guides or tutorials on how to then communicate with those threads/subprocesses.










share|improve this question



























    1















    I have a script that I wrote that I am able to pass arguments to, and I want launch multiple simultaneous iterations (maybe 100+) with unique arguments. My plan was to write another python script which then launch these subscripts/processes, however to be effective, I need the that script to be able to monitor the subscripts for any errors.



    Is there any straightforward way to do this, or a library that offers this functionality? I've been searching for a while and am not having good luck finding anything. Creating subprocesses and multiple threads seems straight forward enough but I can't really find any guides or tutorials on how to then communicate with those threads/subprocesses.










    share|improve this question

























      1












      1








      1








      I have a script that I wrote that I am able to pass arguments to, and I want launch multiple simultaneous iterations (maybe 100+) with unique arguments. My plan was to write another python script which then launch these subscripts/processes, however to be effective, I need the that script to be able to monitor the subscripts for any errors.



      Is there any straightforward way to do this, or a library that offers this functionality? I've been searching for a while and am not having good luck finding anything. Creating subprocesses and multiple threads seems straight forward enough but I can't really find any guides or tutorials on how to then communicate with those threads/subprocesses.










      share|improve this question














      I have a script that I wrote that I am able to pass arguments to, and I want launch multiple simultaneous iterations (maybe 100+) with unique arguments. My plan was to write another python script which then launch these subscripts/processes, however to be effective, I need the that script to be able to monitor the subscripts for any errors.



      Is there any straightforward way to do this, or a library that offers this functionality? I've been searching for a while and am not having good luck finding anything. Creating subprocesses and multiple threads seems straight forward enough but I can't really find any guides or tutorials on how to then communicate with those threads/subprocesses.







      python multithreading parallel-processing






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 22 '18 at 5:42









      K. JamesK. James

      61




      61
























          3 Answers
          3






          active

          oldest

          votes


















          0














          A better way to do this would be to make use of threads. If you made the script you want to call into a function in this larger script, you could have your main function call this script as many times as you want and have the threads report back with information as needed. You can read a little bit about how threads work here.






          share|improve this answer
























          • Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.

            – tripleee
            Nov 22 '18 at 6:19



















          0














          I suggest to use threading.Thread or multiprocessing.Process despite of requirements.



          Simple way to communicate between Threads / Processes is to use Queue. Multiprocessing module provides some other ways to communicate between processes (Queue, Event, Manager, ...)



          You can see some elementary communication in the example:



          import threading
          from Queue import Queue
          import random
          import time


          class Worker(threading.Thread):
          def __init__(self, name, queue_error):
          threading.Thread.__init__(self)
          self.name = name
          self.queue_error = queue_error

          def run(self):
          time.sleep(random.randrange(1, 10))
          # Do some processing ...
          # Report errors
          self.queue_error.put((self.name, 'Error state'))


          class Launcher(object):
          def __init__(self):
          self.queue_error = Queue()

          def main_loop(self):
          # Start threads
          for i in range(10):
          w = Worker(i, self.queue_error)
          w.start()
          # Check for errors
          while True:
          while not self.queue_error.empty():
          error_data = self.queue_error.get()
          print 'Worker #%s reported error: %s' % (error_data[0], error_data[1])
          time.sleep(0.1)


          if __name__ == '__main__':
          l = Launcher()
          l.main_loop()





          share|improve this answer































            0














            Like someone else said, you have to use multiple processes for true parallelism instead of threads because the GIL limitation prevents threads from running concurrently.



            If you want to use the standard multiprocessing library (which is based on launching multiple processes), I suggest using a pool of workers. If I understood correctly, you want to launch 100+ parallel instances. Launching 100+ processes on one host will generate too much overhead. Instead, create a pool of P workers where P is for example the number of cores in your machine and submit the 100+ jobs to the pool. This is simple to do and there are many examples on the web. Also, when you submit jobs to the pool, you can provide a callback function to receive errors. This may be sufficient for your needs (there are examples here).



            The Pool in multiprocessing however can't distribute work across multiple hosts (e.g. cluster of machines) last time I looked. So, if you need to do this, or if you need a more flexible communication scheme, like being able to send updates to the controlling process while the workers are running, my suggestion is to use charm4py (note that I am a charm4py developer so this is where I have experience).



            With charm4py you could create N workers which are distributed among P processes by the runtime (works across multiple hosts), and the workers can communicate with the controller simply by doing remote method invocation. Here is a small example:



            from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
            import time

            WORKER_ITERATIONS = 100


            class Worker(Chare):

            def __init__(self, controller):
            self.controller = controller

            @threaded
            def work(self, x, done_future):
            result = -1
            try:
            for i in range(WORKER_ITERATIONS):
            if i % 20 == 0:
            # send status update to controller
            self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
            if i == 5 and self.thisIndex[0] % 2 == 0:
            # trigger NameError on even-numbered workers
            test[3] = 3
            time.sleep(0.01)
            result = x**2
            except Exception as e:
            # send error to controller
            self.controller.collectError(self.thisIndex, e)
            # send result to controller
            self.contribute(result, Reducer.gather, done_future)


            # This custom map is used to prevent workers from being created on process 0
            # (where the controller is). Not strictly needed, but allows more timely
            # controller output
            class WorkerMap(ArrayMap):
            def procNum(self, index):
            return (index[0] % (charm.numPes() - 1)) + 1


            class Controller(Chare):

            def __init__(self, args):
            self.startTime = time.time()
            done_future = charm.createFuture()
            # create 12 workers, which are distributed by charm4py among processes
            workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
            # start work
            for i in range(12):
            workers[i].work(i, done_future)
            print('Results are', done_future.get()) # wait for result
            exit()

            def progressUpdate(self, worker_id, current_step):
            print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
            'progress', current_step * 100 / WORKER_ITERATIONS, '%')
            # the controller can return a value here and the worker would receive it

            def collectError(self, worker_id, error):
            print(round(time.time() - self.startTime, 3), ': Got error', error,
            'from worker', worker_id)


            charm.start(Controller)


            In this example, the Controller will print status updates and errors as they happen. It
            will print final results from all workers when they are all done. The result for workers
            that have failed will be -1.



            The number of processes P is given at launch. The runtime will distribute the N workers among the available processes. This happens when the workers are created and there is no dynamic load balancing in this particular example.



            Also, note that in the charm4py model remote method invocation is asynchronous and returns a future which the caller can block on, but only the calling thread blocks (not the whole process).



            Hope this helps.






            share|improve this answer























              Your Answer






              StackExchange.ifUsing("editor", function () {
              StackExchange.using("externalEditor", function () {
              StackExchange.using("snippets", function () {
              StackExchange.snippets.init();
              });
              });
              }, "code-snippets");

              StackExchange.ready(function() {
              var channelOptions = {
              tags: "".split(" "),
              id: "1"
              };
              initTagRenderer("".split(" "), "".split(" "), channelOptions);

              StackExchange.using("externalEditor", function() {
              // Have to fire editor after snippets, if snippets enabled
              if (StackExchange.settings.snippets.snippetsEnabled) {
              StackExchange.using("snippets", function() {
              createEditor();
              });
              }
              else {
              createEditor();
              }
              });

              function createEditor() {
              StackExchange.prepareEditor({
              heartbeatType: 'answer',
              autoActivateHeartbeat: false,
              convertImagesToLinks: true,
              noModals: true,
              showLowRepImageUploadWarning: true,
              reputationToPostImages: 10,
              bindNavPrevention: true,
              postfix: "",
              imageUploader: {
              brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
              contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
              allowUrls: true
              },
              onDemand: true,
              discardSelector: ".discard-answer"
              ,immediatelyShowMarkdownHelp:true
              });


              }
              });














              draft saved

              draft discarded


















              StackExchange.ready(
              function () {
              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424535%2frunning-multiple-python-scripts-from-a-single-script-and-communicating-back-and%23new-answer', 'question_page');
              }
              );

              Post as a guest















              Required, but never shown

























              3 Answers
              3






              active

              oldest

              votes








              3 Answers
              3






              active

              oldest

              votes









              active

              oldest

              votes






              active

              oldest

              votes









              0














              A better way to do this would be to make use of threads. If you made the script you want to call into a function in this larger script, you could have your main function call this script as many times as you want and have the threads report back with information as needed. You can read a little bit about how threads work here.






              share|improve this answer
























              • Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.

                – tripleee
                Nov 22 '18 at 6:19
















              0














              A better way to do this would be to make use of threads. If you made the script you want to call into a function in this larger script, you could have your main function call this script as many times as you want and have the threads report back with information as needed. You can read a little bit about how threads work here.






              share|improve this answer
























              • Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.

                – tripleee
                Nov 22 '18 at 6:19














              0












              0








              0







              A better way to do this would be to make use of threads. If you made the script you want to call into a function in this larger script, you could have your main function call this script as many times as you want and have the threads report back with information as needed. You can read a little bit about how threads work here.






              share|improve this answer













              A better way to do this would be to make use of threads. If you made the script you want to call into a function in this larger script, you could have your main function call this script as many times as you want and have the threads report back with information as needed. You can read a little bit about how threads work here.







              share|improve this answer












              share|improve this answer



              share|improve this answer










              answered Nov 22 '18 at 6:02









              Unsolved CypherUnsolved Cypher

              495314




              495314













              • Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.

                – tripleee
                Nov 22 '18 at 6:19



















              • Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.

                – tripleee
                Nov 22 '18 at 6:19

















              Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.

              – tripleee
              Nov 22 '18 at 6:19





              Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.

              – tripleee
              Nov 22 '18 at 6:19













              0














              I suggest to use threading.Thread or multiprocessing.Process despite of requirements.



              Simple way to communicate between Threads / Processes is to use Queue. Multiprocessing module provides some other ways to communicate between processes (Queue, Event, Manager, ...)



              You can see some elementary communication in the example:



              import threading
              from Queue import Queue
              import random
              import time


              class Worker(threading.Thread):
              def __init__(self, name, queue_error):
              threading.Thread.__init__(self)
              self.name = name
              self.queue_error = queue_error

              def run(self):
              time.sleep(random.randrange(1, 10))
              # Do some processing ...
              # Report errors
              self.queue_error.put((self.name, 'Error state'))


              class Launcher(object):
              def __init__(self):
              self.queue_error = Queue()

              def main_loop(self):
              # Start threads
              for i in range(10):
              w = Worker(i, self.queue_error)
              w.start()
              # Check for errors
              while True:
              while not self.queue_error.empty():
              error_data = self.queue_error.get()
              print 'Worker #%s reported error: %s' % (error_data[0], error_data[1])
              time.sleep(0.1)


              if __name__ == '__main__':
              l = Launcher()
              l.main_loop()





              share|improve this answer




























                0














                I suggest to use threading.Thread or multiprocessing.Process despite of requirements.



                Simple way to communicate between Threads / Processes is to use Queue. Multiprocessing module provides some other ways to communicate between processes (Queue, Event, Manager, ...)



                You can see some elementary communication in the example:



                import threading
                from Queue import Queue
                import random
                import time


                class Worker(threading.Thread):
                def __init__(self, name, queue_error):
                threading.Thread.__init__(self)
                self.name = name
                self.queue_error = queue_error

                def run(self):
                time.sleep(random.randrange(1, 10))
                # Do some processing ...
                # Report errors
                self.queue_error.put((self.name, 'Error state'))


                class Launcher(object):
                def __init__(self):
                self.queue_error = Queue()

                def main_loop(self):
                # Start threads
                for i in range(10):
                w = Worker(i, self.queue_error)
                w.start()
                # Check for errors
                while True:
                while not self.queue_error.empty():
                error_data = self.queue_error.get()
                print 'Worker #%s reported error: %s' % (error_data[0], error_data[1])
                time.sleep(0.1)


                if __name__ == '__main__':
                l = Launcher()
                l.main_loop()





                share|improve this answer


























                  0












                  0








                  0







                  I suggest to use threading.Thread or multiprocessing.Process despite of requirements.



                  Simple way to communicate between Threads / Processes is to use Queue. Multiprocessing module provides some other ways to communicate between processes (Queue, Event, Manager, ...)



                  You can see some elementary communication in the example:



                  import threading
                  from Queue import Queue
                  import random
                  import time


                  class Worker(threading.Thread):
                  def __init__(self, name, queue_error):
                  threading.Thread.__init__(self)
                  self.name = name
                  self.queue_error = queue_error

                  def run(self):
                  time.sleep(random.randrange(1, 10))
                  # Do some processing ...
                  # Report errors
                  self.queue_error.put((self.name, 'Error state'))


                  class Launcher(object):
                  def __init__(self):
                  self.queue_error = Queue()

                  def main_loop(self):
                  # Start threads
                  for i in range(10):
                  w = Worker(i, self.queue_error)
                  w.start()
                  # Check for errors
                  while True:
                  while not self.queue_error.empty():
                  error_data = self.queue_error.get()
                  print 'Worker #%s reported error: %s' % (error_data[0], error_data[1])
                  time.sleep(0.1)


                  if __name__ == '__main__':
                  l = Launcher()
                  l.main_loop()





                  share|improve this answer













                  I suggest to use threading.Thread or multiprocessing.Process despite of requirements.



                  Simple way to communicate between Threads / Processes is to use Queue. Multiprocessing module provides some other ways to communicate between processes (Queue, Event, Manager, ...)



                  You can see some elementary communication in the example:



                  import threading
                  from Queue import Queue
                  import random
                  import time


                  class Worker(threading.Thread):
                  def __init__(self, name, queue_error):
                  threading.Thread.__init__(self)
                  self.name = name
                  self.queue_error = queue_error

                  def run(self):
                  time.sleep(random.randrange(1, 10))
                  # Do some processing ...
                  # Report errors
                  self.queue_error.put((self.name, 'Error state'))


                  class Launcher(object):
                  def __init__(self):
                  self.queue_error = Queue()

                  def main_loop(self):
                  # Start threads
                  for i in range(10):
                  w = Worker(i, self.queue_error)
                  w.start()
                  # Check for errors
                  while True:
                  while not self.queue_error.empty():
                  error_data = self.queue_error.get()
                  print 'Worker #%s reported error: %s' % (error_data[0], error_data[1])
                  time.sleep(0.1)


                  if __name__ == '__main__':
                  l = Launcher()
                  l.main_loop()






                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Nov 22 '18 at 8:33









                  Peter SvacPeter Svac

                  6613




                  6613























                      0














                      Like someone else said, you have to use multiple processes for true parallelism instead of threads because the GIL limitation prevents threads from running concurrently.



                      If you want to use the standard multiprocessing library (which is based on launching multiple processes), I suggest using a pool of workers. If I understood correctly, you want to launch 100+ parallel instances. Launching 100+ processes on one host will generate too much overhead. Instead, create a pool of P workers where P is for example the number of cores in your machine and submit the 100+ jobs to the pool. This is simple to do and there are many examples on the web. Also, when you submit jobs to the pool, you can provide a callback function to receive errors. This may be sufficient for your needs (there are examples here).



                      The Pool in multiprocessing however can't distribute work across multiple hosts (e.g. cluster of machines) last time I looked. So, if you need to do this, or if you need a more flexible communication scheme, like being able to send updates to the controlling process while the workers are running, my suggestion is to use charm4py (note that I am a charm4py developer so this is where I have experience).



                      With charm4py you could create N workers which are distributed among P processes by the runtime (works across multiple hosts), and the workers can communicate with the controller simply by doing remote method invocation. Here is a small example:



                      from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
                      import time

                      WORKER_ITERATIONS = 100


                      class Worker(Chare):

                      def __init__(self, controller):
                      self.controller = controller

                      @threaded
                      def work(self, x, done_future):
                      result = -1
                      try:
                      for i in range(WORKER_ITERATIONS):
                      if i % 20 == 0:
                      # send status update to controller
                      self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
                      if i == 5 and self.thisIndex[0] % 2 == 0:
                      # trigger NameError on even-numbered workers
                      test[3] = 3
                      time.sleep(0.01)
                      result = x**2
                      except Exception as e:
                      # send error to controller
                      self.controller.collectError(self.thisIndex, e)
                      # send result to controller
                      self.contribute(result, Reducer.gather, done_future)


                      # This custom map is used to prevent workers from being created on process 0
                      # (where the controller is). Not strictly needed, but allows more timely
                      # controller output
                      class WorkerMap(ArrayMap):
                      def procNum(self, index):
                      return (index[0] % (charm.numPes() - 1)) + 1


                      class Controller(Chare):

                      def __init__(self, args):
                      self.startTime = time.time()
                      done_future = charm.createFuture()
                      # create 12 workers, which are distributed by charm4py among processes
                      workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
                      # start work
                      for i in range(12):
                      workers[i].work(i, done_future)
                      print('Results are', done_future.get()) # wait for result
                      exit()

                      def progressUpdate(self, worker_id, current_step):
                      print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
                      'progress', current_step * 100 / WORKER_ITERATIONS, '%')
                      # the controller can return a value here and the worker would receive it

                      def collectError(self, worker_id, error):
                      print(round(time.time() - self.startTime, 3), ': Got error', error,
                      'from worker', worker_id)


                      charm.start(Controller)


                      In this example, the Controller will print status updates and errors as they happen. It
                      will print final results from all workers when they are all done. The result for workers
                      that have failed will be -1.



                      The number of processes P is given at launch. The runtime will distribute the N workers among the available processes. This happens when the workers are created and there is no dynamic load balancing in this particular example.



                      Also, note that in the charm4py model remote method invocation is asynchronous and returns a future which the caller can block on, but only the calling thread blocks (not the whole process).



                      Hope this helps.






                      share|improve this answer




























                        0














                        Like someone else said, you have to use multiple processes for true parallelism instead of threads because the GIL limitation prevents threads from running concurrently.



                        If you want to use the standard multiprocessing library (which is based on launching multiple processes), I suggest using a pool of workers. If I understood correctly, you want to launch 100+ parallel instances. Launching 100+ processes on one host will generate too much overhead. Instead, create a pool of P workers where P is for example the number of cores in your machine and submit the 100+ jobs to the pool. This is simple to do and there are many examples on the web. Also, when you submit jobs to the pool, you can provide a callback function to receive errors. This may be sufficient for your needs (there are examples here).



                        The Pool in multiprocessing however can't distribute work across multiple hosts (e.g. cluster of machines) last time I looked. So, if you need to do this, or if you need a more flexible communication scheme, like being able to send updates to the controlling process while the workers are running, my suggestion is to use charm4py (note that I am a charm4py developer so this is where I have experience).



                        With charm4py you could create N workers which are distributed among P processes by the runtime (works across multiple hosts), and the workers can communicate with the controller simply by doing remote method invocation. Here is a small example:



                        from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
                        import time

                        WORKER_ITERATIONS = 100


                        class Worker(Chare):

                        def __init__(self, controller):
                        self.controller = controller

                        @threaded
                        def work(self, x, done_future):
                        result = -1
                        try:
                        for i in range(WORKER_ITERATIONS):
                        if i % 20 == 0:
                        # send status update to controller
                        self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
                        if i == 5 and self.thisIndex[0] % 2 == 0:
                        # trigger NameError on even-numbered workers
                        test[3] = 3
                        time.sleep(0.01)
                        result = x**2
                        except Exception as e:
                        # send error to controller
                        self.controller.collectError(self.thisIndex, e)
                        # send result to controller
                        self.contribute(result, Reducer.gather, done_future)


                        # This custom map is used to prevent workers from being created on process 0
                        # (where the controller is). Not strictly needed, but allows more timely
                        # controller output
                        class WorkerMap(ArrayMap):
                        def procNum(self, index):
                        return (index[0] % (charm.numPes() - 1)) + 1


                        class Controller(Chare):

                        def __init__(self, args):
                        self.startTime = time.time()
                        done_future = charm.createFuture()
                        # create 12 workers, which are distributed by charm4py among processes
                        workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
                        # start work
                        for i in range(12):
                        workers[i].work(i, done_future)
                        print('Results are', done_future.get()) # wait for result
                        exit()

                        def progressUpdate(self, worker_id, current_step):
                        print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
                        'progress', current_step * 100 / WORKER_ITERATIONS, '%')
                        # the controller can return a value here and the worker would receive it

                        def collectError(self, worker_id, error):
                        print(round(time.time() - self.startTime, 3), ': Got error', error,
                        'from worker', worker_id)


                        charm.start(Controller)


                        In this example, the Controller will print status updates and errors as they happen. It
                        will print final results from all workers when they are all done. The result for workers
                        that have failed will be -1.



                        The number of processes P is given at launch. The runtime will distribute the N workers among the available processes. This happens when the workers are created and there is no dynamic load balancing in this particular example.



                        Also, note that in the charm4py model remote method invocation is asynchronous and returns a future which the caller can block on, but only the calling thread blocks (not the whole process).



                        Hope this helps.






                        share|improve this answer


























                          0












                          0








                          0







                          Like someone else said, you have to use multiple processes for true parallelism instead of threads because the GIL limitation prevents threads from running concurrently.



                          If you want to use the standard multiprocessing library (which is based on launching multiple processes), I suggest using a pool of workers. If I understood correctly, you want to launch 100+ parallel instances. Launching 100+ processes on one host will generate too much overhead. Instead, create a pool of P workers where P is for example the number of cores in your machine and submit the 100+ jobs to the pool. This is simple to do and there are many examples on the web. Also, when you submit jobs to the pool, you can provide a callback function to receive errors. This may be sufficient for your needs (there are examples here).



                          The Pool in multiprocessing however can't distribute work across multiple hosts (e.g. cluster of machines) last time I looked. So, if you need to do this, or if you need a more flexible communication scheme, like being able to send updates to the controlling process while the workers are running, my suggestion is to use charm4py (note that I am a charm4py developer so this is where I have experience).



                          With charm4py you could create N workers which are distributed among P processes by the runtime (works across multiple hosts), and the workers can communicate with the controller simply by doing remote method invocation. Here is a small example:



                          from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
                          import time

                          WORKER_ITERATIONS = 100


                          class Worker(Chare):

                          def __init__(self, controller):
                          self.controller = controller

                          @threaded
                          def work(self, x, done_future):
                          result = -1
                          try:
                          for i in range(WORKER_ITERATIONS):
                          if i % 20 == 0:
                          # send status update to controller
                          self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
                          if i == 5 and self.thisIndex[0] % 2 == 0:
                          # trigger NameError on even-numbered workers
                          test[3] = 3
                          time.sleep(0.01)
                          result = x**2
                          except Exception as e:
                          # send error to controller
                          self.controller.collectError(self.thisIndex, e)
                          # send result to controller
                          self.contribute(result, Reducer.gather, done_future)


                          # This custom map is used to prevent workers from being created on process 0
                          # (where the controller is). Not strictly needed, but allows more timely
                          # controller output
                          class WorkerMap(ArrayMap):
                          def procNum(self, index):
                          return (index[0] % (charm.numPes() - 1)) + 1


                          class Controller(Chare):

                          def __init__(self, args):
                          self.startTime = time.time()
                          done_future = charm.createFuture()
                          # create 12 workers, which are distributed by charm4py among processes
                          workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
                          # start work
                          for i in range(12):
                          workers[i].work(i, done_future)
                          print('Results are', done_future.get()) # wait for result
                          exit()

                          def progressUpdate(self, worker_id, current_step):
                          print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
                          'progress', current_step * 100 / WORKER_ITERATIONS, '%')
                          # the controller can return a value here and the worker would receive it

                          def collectError(self, worker_id, error):
                          print(round(time.time() - self.startTime, 3), ': Got error', error,
                          'from worker', worker_id)


                          charm.start(Controller)


                          In this example, the Controller will print status updates and errors as they happen. It
                          will print final results from all workers when they are all done. The result for workers
                          that have failed will be -1.



                          The number of processes P is given at launch. The runtime will distribute the N workers among the available processes. This happens when the workers are created and there is no dynamic load balancing in this particular example.



                          Also, note that in the charm4py model remote method invocation is asynchronous and returns a future which the caller can block on, but only the calling thread blocks (not the whole process).



                          Hope this helps.






                          share|improve this answer













                          Like someone else said, you have to use multiple processes for true parallelism instead of threads because the GIL limitation prevents threads from running concurrently.



                          If you want to use the standard multiprocessing library (which is based on launching multiple processes), I suggest using a pool of workers. If I understood correctly, you want to launch 100+ parallel instances. Launching 100+ processes on one host will generate too much overhead. Instead, create a pool of P workers where P is for example the number of cores in your machine and submit the 100+ jobs to the pool. This is simple to do and there are many examples on the web. Also, when you submit jobs to the pool, you can provide a callback function to receive errors. This may be sufficient for your needs (there are examples here).



                          The Pool in multiprocessing however can't distribute work across multiple hosts (e.g. cluster of machines) last time I looked. So, if you need to do this, or if you need a more flexible communication scheme, like being able to send updates to the controlling process while the workers are running, my suggestion is to use charm4py (note that I am a charm4py developer so this is where I have experience).



                          With charm4py you could create N workers which are distributed among P processes by the runtime (works across multiple hosts), and the workers can communicate with the controller simply by doing remote method invocation. Here is a small example:



                          from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
                          import time

                          WORKER_ITERATIONS = 100


                          class Worker(Chare):

                          def __init__(self, controller):
                          self.controller = controller

                          @threaded
                          def work(self, x, done_future):
                          result = -1
                          try:
                          for i in range(WORKER_ITERATIONS):
                          if i % 20 == 0:
                          # send status update to controller
                          self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
                          if i == 5 and self.thisIndex[0] % 2 == 0:
                          # trigger NameError on even-numbered workers
                          test[3] = 3
                          time.sleep(0.01)
                          result = x**2
                          except Exception as e:
                          # send error to controller
                          self.controller.collectError(self.thisIndex, e)
                          # send result to controller
                          self.contribute(result, Reducer.gather, done_future)


                          # This custom map is used to prevent workers from being created on process 0
                          # (where the controller is). Not strictly needed, but allows more timely
                          # controller output
                          class WorkerMap(ArrayMap):
                          def procNum(self, index):
                          return (index[0] % (charm.numPes() - 1)) + 1


                          class Controller(Chare):

                          def __init__(self, args):
                          self.startTime = time.time()
                          done_future = charm.createFuture()
                          # create 12 workers, which are distributed by charm4py among processes
                          workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
                          # start work
                          for i in range(12):
                          workers[i].work(i, done_future)
                          print('Results are', done_future.get()) # wait for result
                          exit()

                          def progressUpdate(self, worker_id, current_step):
                          print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
                          'progress', current_step * 100 / WORKER_ITERATIONS, '%')
                          # the controller can return a value here and the worker would receive it

                          def collectError(self, worker_id, error):
                          print(round(time.time() - self.startTime, 3), ': Got error', error,
                          'from worker', worker_id)


                          charm.start(Controller)


                          In this example, the Controller will print status updates and errors as they happen. It
                          will print final results from all workers when they are all done. The result for workers
                          that have failed will be -1.



                          The number of processes P is given at launch. The runtime will distribute the N workers among the available processes. This happens when the workers are created and there is no dynamic load balancing in this particular example.



                          Also, note that in the charm4py model remote method invocation is asynchronous and returns a future which the caller can block on, but only the calling thread blocks (not the whole process).



                          Hope this helps.







                          share|improve this answer












                          share|improve this answer



                          share|improve this answer










                          answered Dec 2 '18 at 0:09









                          Juan GalvezJuan Galvez

                          11




                          11






























                              draft saved

                              draft discarded




















































                              Thanks for contributing an answer to Stack Overflow!


                              • Please be sure to answer the question. Provide details and share your research!

                              But avoid



                              • Asking for help, clarification, or responding to other answers.

                              • Making statements based on opinion; back them up with references or personal experience.


                              To learn more, see our tips on writing great answers.




                              draft saved


                              draft discarded














                              StackExchange.ready(
                              function () {
                              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424535%2frunning-multiple-python-scripts-from-a-single-script-and-communicating-back-and%23new-answer', 'question_page');
                              }
                              );

                              Post as a guest















                              Required, but never shown





















































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown

































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown







                              Popular posts from this blog

                              Wiesbaden

                              Marschland

                              Dieringhausen