Running multiple python scripts from a single script, and communicating back and forth between them?
I have a script that I wrote that I am able to pass arguments to, and I want launch multiple simultaneous iterations (maybe 100+) with unique arguments. My plan was to write another python script which then launch these subscripts/processes, however to be effective, I need the that script to be able to monitor the subscripts for any errors.
Is there any straightforward way to do this, or a library that offers this functionality? I've been searching for a while and am not having good luck finding anything. Creating subprocesses and multiple threads seems straight forward enough but I can't really find any guides or tutorials on how to then communicate with those threads/subprocesses.
python multithreading parallel-processing
add a comment |
I have a script that I wrote that I am able to pass arguments to, and I want launch multiple simultaneous iterations (maybe 100+) with unique arguments. My plan was to write another python script which then launch these subscripts/processes, however to be effective, I need the that script to be able to monitor the subscripts for any errors.
Is there any straightforward way to do this, or a library that offers this functionality? I've been searching for a while and am not having good luck finding anything. Creating subprocesses and multiple threads seems straight forward enough but I can't really find any guides or tutorials on how to then communicate with those threads/subprocesses.
python multithreading parallel-processing
add a comment |
I have a script that I wrote that I am able to pass arguments to, and I want launch multiple simultaneous iterations (maybe 100+) with unique arguments. My plan was to write another python script which then launch these subscripts/processes, however to be effective, I need the that script to be able to monitor the subscripts for any errors.
Is there any straightforward way to do this, or a library that offers this functionality? I've been searching for a while and am not having good luck finding anything. Creating subprocesses and multiple threads seems straight forward enough but I can't really find any guides or tutorials on how to then communicate with those threads/subprocesses.
python multithreading parallel-processing
I have a script that I wrote that I am able to pass arguments to, and I want launch multiple simultaneous iterations (maybe 100+) with unique arguments. My plan was to write another python script which then launch these subscripts/processes, however to be effective, I need the that script to be able to monitor the subscripts for any errors.
Is there any straightforward way to do this, or a library that offers this functionality? I've been searching for a while and am not having good luck finding anything. Creating subprocesses and multiple threads seems straight forward enough but I can't really find any guides or tutorials on how to then communicate with those threads/subprocesses.
python multithreading parallel-processing
python multithreading parallel-processing
asked Nov 22 '18 at 5:42
K. JamesK. James
61
61
add a comment |
add a comment |
3 Answers
3
active
oldest
votes
A better way to do this would be to make use of threads. If you made the script you want to call into a function in this larger script, you could have your main function call this script as many times as you want and have the threads report back with information as needed. You can read a little bit about how threads work here.
Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.
– tripleee
Nov 22 '18 at 6:19
add a comment |
I suggest to use threading.Thread or multiprocessing.Process despite of requirements.
Simple way to communicate between Threads / Processes is to use Queue. Multiprocessing module provides some other ways to communicate between processes (Queue, Event, Manager, ...)
You can see some elementary communication in the example:
import threading
from Queue import Queue
import random
import time
class Worker(threading.Thread):
def __init__(self, name, queue_error):
threading.Thread.__init__(self)
self.name = name
self.queue_error = queue_error
def run(self):
time.sleep(random.randrange(1, 10))
# Do some processing ...
# Report errors
self.queue_error.put((self.name, 'Error state'))
class Launcher(object):
def __init__(self):
self.queue_error = Queue()
def main_loop(self):
# Start threads
for i in range(10):
w = Worker(i, self.queue_error)
w.start()
# Check for errors
while True:
while not self.queue_error.empty():
error_data = self.queue_error.get()
print 'Worker #%s reported error: %s' % (error_data[0], error_data[1])
time.sleep(0.1)
if __name__ == '__main__':
l = Launcher()
l.main_loop()
add a comment |
Like someone else said, you have to use multiple processes for true parallelism instead of threads because the GIL limitation prevents threads from running concurrently.
If you want to use the standard multiprocessing library (which is based on launching multiple processes), I suggest using a pool of workers. If I understood correctly, you want to launch 100+ parallel instances. Launching 100+ processes on one host will generate too much overhead. Instead, create a pool of P workers where P is for example the number of cores in your machine and submit the 100+ jobs to the pool. This is simple to do and there are many examples on the web. Also, when you submit jobs to the pool, you can provide a callback function to receive errors. This may be sufficient for your needs (there are examples here).
The Pool in multiprocessing however can't distribute work across multiple hosts (e.g. cluster of machines) last time I looked. So, if you need to do this, or if you need a more flexible communication scheme, like being able to send updates to the controlling process while the workers are running, my suggestion is to use charm4py (note that I am a charm4py developer so this is where I have experience).
With charm4py you could create N workers which are distributed among P processes by the runtime (works across multiple hosts), and the workers can communicate with the controller simply by doing remote method invocation. Here is a small example:
from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
import time
WORKER_ITERATIONS = 100
class Worker(Chare):
def __init__(self, controller):
self.controller = controller
@threaded
def work(self, x, done_future):
result = -1
try:
for i in range(WORKER_ITERATIONS):
if i % 20 == 0:
# send status update to controller
self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
if i == 5 and self.thisIndex[0] % 2 == 0:
# trigger NameError on even-numbered workers
test[3] = 3
time.sleep(0.01)
result = x**2
except Exception as e:
# send error to controller
self.controller.collectError(self.thisIndex, e)
# send result to controller
self.contribute(result, Reducer.gather, done_future)
# This custom map is used to prevent workers from being created on process 0
# (where the controller is). Not strictly needed, but allows more timely
# controller output
class WorkerMap(ArrayMap):
def procNum(self, index):
return (index[0] % (charm.numPes() - 1)) + 1
class Controller(Chare):
def __init__(self, args):
self.startTime = time.time()
done_future = charm.createFuture()
# create 12 workers, which are distributed by charm4py among processes
workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
# start work
for i in range(12):
workers[i].work(i, done_future)
print('Results are', done_future.get()) # wait for result
exit()
def progressUpdate(self, worker_id, current_step):
print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
'progress', current_step * 100 / WORKER_ITERATIONS, '%')
# the controller can return a value here and the worker would receive it
def collectError(self, worker_id, error):
print(round(time.time() - self.startTime, 3), ': Got error', error,
'from worker', worker_id)
charm.start(Controller)
In this example, the Controller will print status updates and errors as they happen. It
will print final results from all workers when they are all done. The result for workers
that have failed will be -1.
The number of processes P is given at launch. The runtime will distribute the N workers among the available processes. This happens when the workers are created and there is no dynamic load balancing in this particular example.
Also, note that in the charm4py model remote method invocation is asynchronous and returns a future which the caller can block on, but only the calling thread blocks (not the whole process).
Hope this helps.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424535%2frunning-multiple-python-scripts-from-a-single-script-and-communicating-back-and%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
A better way to do this would be to make use of threads. If you made the script you want to call into a function in this larger script, you could have your main function call this script as many times as you want and have the threads report back with information as needed. You can read a little bit about how threads work here.
Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.
– tripleee
Nov 22 '18 at 6:19
add a comment |
A better way to do this would be to make use of threads. If you made the script you want to call into a function in this larger script, you could have your main function call this script as many times as you want and have the threads report back with information as needed. You can read a little bit about how threads work here.
Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.
– tripleee
Nov 22 '18 at 6:19
add a comment |
A better way to do this would be to make use of threads. If you made the script you want to call into a function in this larger script, you could have your main function call this script as many times as you want and have the threads report back with information as needed. You can read a little bit about how threads work here.
A better way to do this would be to make use of threads. If you made the script you want to call into a function in this larger script, you could have your main function call this script as many times as you want and have the threads report back with information as needed. You can read a little bit about how threads work here.
answered Nov 22 '18 at 6:02
Unsolved CypherUnsolved Cypher
495314
495314
Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.
– tripleee
Nov 22 '18 at 6:19
add a comment |
Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.
– tripleee
Nov 22 '18 at 6:19
Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.
– tripleee
Nov 22 '18 at 6:19
Threads are limited by the GIL though. For truly distributed processing you want some sort of IPC, perhaps even across the network.
– tripleee
Nov 22 '18 at 6:19
add a comment |
I suggest to use threading.Thread or multiprocessing.Process despite of requirements.
Simple way to communicate between Threads / Processes is to use Queue. Multiprocessing module provides some other ways to communicate between processes (Queue, Event, Manager, ...)
You can see some elementary communication in the example:
import threading
from Queue import Queue
import random
import time
class Worker(threading.Thread):
def __init__(self, name, queue_error):
threading.Thread.__init__(self)
self.name = name
self.queue_error = queue_error
def run(self):
time.sleep(random.randrange(1, 10))
# Do some processing ...
# Report errors
self.queue_error.put((self.name, 'Error state'))
class Launcher(object):
def __init__(self):
self.queue_error = Queue()
def main_loop(self):
# Start threads
for i in range(10):
w = Worker(i, self.queue_error)
w.start()
# Check for errors
while True:
while not self.queue_error.empty():
error_data = self.queue_error.get()
print 'Worker #%s reported error: %s' % (error_data[0], error_data[1])
time.sleep(0.1)
if __name__ == '__main__':
l = Launcher()
l.main_loop()
add a comment |
I suggest to use threading.Thread or multiprocessing.Process despite of requirements.
Simple way to communicate between Threads / Processes is to use Queue. Multiprocessing module provides some other ways to communicate between processes (Queue, Event, Manager, ...)
You can see some elementary communication in the example:
import threading
from Queue import Queue
import random
import time
class Worker(threading.Thread):
def __init__(self, name, queue_error):
threading.Thread.__init__(self)
self.name = name
self.queue_error = queue_error
def run(self):
time.sleep(random.randrange(1, 10))
# Do some processing ...
# Report errors
self.queue_error.put((self.name, 'Error state'))
class Launcher(object):
def __init__(self):
self.queue_error = Queue()
def main_loop(self):
# Start threads
for i in range(10):
w = Worker(i, self.queue_error)
w.start()
# Check for errors
while True:
while not self.queue_error.empty():
error_data = self.queue_error.get()
print 'Worker #%s reported error: %s' % (error_data[0], error_data[1])
time.sleep(0.1)
if __name__ == '__main__':
l = Launcher()
l.main_loop()
add a comment |
I suggest to use threading.Thread or multiprocessing.Process despite of requirements.
Simple way to communicate between Threads / Processes is to use Queue. Multiprocessing module provides some other ways to communicate between processes (Queue, Event, Manager, ...)
You can see some elementary communication in the example:
import threading
from Queue import Queue
import random
import time
class Worker(threading.Thread):
def __init__(self, name, queue_error):
threading.Thread.__init__(self)
self.name = name
self.queue_error = queue_error
def run(self):
time.sleep(random.randrange(1, 10))
# Do some processing ...
# Report errors
self.queue_error.put((self.name, 'Error state'))
class Launcher(object):
def __init__(self):
self.queue_error = Queue()
def main_loop(self):
# Start threads
for i in range(10):
w = Worker(i, self.queue_error)
w.start()
# Check for errors
while True:
while not self.queue_error.empty():
error_data = self.queue_error.get()
print 'Worker #%s reported error: %s' % (error_data[0], error_data[1])
time.sleep(0.1)
if __name__ == '__main__':
l = Launcher()
l.main_loop()
I suggest to use threading.Thread or multiprocessing.Process despite of requirements.
Simple way to communicate between Threads / Processes is to use Queue. Multiprocessing module provides some other ways to communicate between processes (Queue, Event, Manager, ...)
You can see some elementary communication in the example:
import threading
from Queue import Queue
import random
import time
class Worker(threading.Thread):
def __init__(self, name, queue_error):
threading.Thread.__init__(self)
self.name = name
self.queue_error = queue_error
def run(self):
time.sleep(random.randrange(1, 10))
# Do some processing ...
# Report errors
self.queue_error.put((self.name, 'Error state'))
class Launcher(object):
def __init__(self):
self.queue_error = Queue()
def main_loop(self):
# Start threads
for i in range(10):
w = Worker(i, self.queue_error)
w.start()
# Check for errors
while True:
while not self.queue_error.empty():
error_data = self.queue_error.get()
print 'Worker #%s reported error: %s' % (error_data[0], error_data[1])
time.sleep(0.1)
if __name__ == '__main__':
l = Launcher()
l.main_loop()
answered Nov 22 '18 at 8:33
Peter SvacPeter Svac
6613
6613
add a comment |
add a comment |
Like someone else said, you have to use multiple processes for true parallelism instead of threads because the GIL limitation prevents threads from running concurrently.
If you want to use the standard multiprocessing library (which is based on launching multiple processes), I suggest using a pool of workers. If I understood correctly, you want to launch 100+ parallel instances. Launching 100+ processes on one host will generate too much overhead. Instead, create a pool of P workers where P is for example the number of cores in your machine and submit the 100+ jobs to the pool. This is simple to do and there are many examples on the web. Also, when you submit jobs to the pool, you can provide a callback function to receive errors. This may be sufficient for your needs (there are examples here).
The Pool in multiprocessing however can't distribute work across multiple hosts (e.g. cluster of machines) last time I looked. So, if you need to do this, or if you need a more flexible communication scheme, like being able to send updates to the controlling process while the workers are running, my suggestion is to use charm4py (note that I am a charm4py developer so this is where I have experience).
With charm4py you could create N workers which are distributed among P processes by the runtime (works across multiple hosts), and the workers can communicate with the controller simply by doing remote method invocation. Here is a small example:
from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
import time
WORKER_ITERATIONS = 100
class Worker(Chare):
def __init__(self, controller):
self.controller = controller
@threaded
def work(self, x, done_future):
result = -1
try:
for i in range(WORKER_ITERATIONS):
if i % 20 == 0:
# send status update to controller
self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
if i == 5 and self.thisIndex[0] % 2 == 0:
# trigger NameError on even-numbered workers
test[3] = 3
time.sleep(0.01)
result = x**2
except Exception as e:
# send error to controller
self.controller.collectError(self.thisIndex, e)
# send result to controller
self.contribute(result, Reducer.gather, done_future)
# This custom map is used to prevent workers from being created on process 0
# (where the controller is). Not strictly needed, but allows more timely
# controller output
class WorkerMap(ArrayMap):
def procNum(self, index):
return (index[0] % (charm.numPes() - 1)) + 1
class Controller(Chare):
def __init__(self, args):
self.startTime = time.time()
done_future = charm.createFuture()
# create 12 workers, which are distributed by charm4py among processes
workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
# start work
for i in range(12):
workers[i].work(i, done_future)
print('Results are', done_future.get()) # wait for result
exit()
def progressUpdate(self, worker_id, current_step):
print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
'progress', current_step * 100 / WORKER_ITERATIONS, '%')
# the controller can return a value here and the worker would receive it
def collectError(self, worker_id, error):
print(round(time.time() - self.startTime, 3), ': Got error', error,
'from worker', worker_id)
charm.start(Controller)
In this example, the Controller will print status updates and errors as they happen. It
will print final results from all workers when they are all done. The result for workers
that have failed will be -1.
The number of processes P is given at launch. The runtime will distribute the N workers among the available processes. This happens when the workers are created and there is no dynamic load balancing in this particular example.
Also, note that in the charm4py model remote method invocation is asynchronous and returns a future which the caller can block on, but only the calling thread blocks (not the whole process).
Hope this helps.
add a comment |
Like someone else said, you have to use multiple processes for true parallelism instead of threads because the GIL limitation prevents threads from running concurrently.
If you want to use the standard multiprocessing library (which is based on launching multiple processes), I suggest using a pool of workers. If I understood correctly, you want to launch 100+ parallel instances. Launching 100+ processes on one host will generate too much overhead. Instead, create a pool of P workers where P is for example the number of cores in your machine and submit the 100+ jobs to the pool. This is simple to do and there are many examples on the web. Also, when you submit jobs to the pool, you can provide a callback function to receive errors. This may be sufficient for your needs (there are examples here).
The Pool in multiprocessing however can't distribute work across multiple hosts (e.g. cluster of machines) last time I looked. So, if you need to do this, or if you need a more flexible communication scheme, like being able to send updates to the controlling process while the workers are running, my suggestion is to use charm4py (note that I am a charm4py developer so this is where I have experience).
With charm4py you could create N workers which are distributed among P processes by the runtime (works across multiple hosts), and the workers can communicate with the controller simply by doing remote method invocation. Here is a small example:
from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
import time
WORKER_ITERATIONS = 100
class Worker(Chare):
def __init__(self, controller):
self.controller = controller
@threaded
def work(self, x, done_future):
result = -1
try:
for i in range(WORKER_ITERATIONS):
if i % 20 == 0:
# send status update to controller
self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
if i == 5 and self.thisIndex[0] % 2 == 0:
# trigger NameError on even-numbered workers
test[3] = 3
time.sleep(0.01)
result = x**2
except Exception as e:
# send error to controller
self.controller.collectError(self.thisIndex, e)
# send result to controller
self.contribute(result, Reducer.gather, done_future)
# This custom map is used to prevent workers from being created on process 0
# (where the controller is). Not strictly needed, but allows more timely
# controller output
class WorkerMap(ArrayMap):
def procNum(self, index):
return (index[0] % (charm.numPes() - 1)) + 1
class Controller(Chare):
def __init__(self, args):
self.startTime = time.time()
done_future = charm.createFuture()
# create 12 workers, which are distributed by charm4py among processes
workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
# start work
for i in range(12):
workers[i].work(i, done_future)
print('Results are', done_future.get()) # wait for result
exit()
def progressUpdate(self, worker_id, current_step):
print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
'progress', current_step * 100 / WORKER_ITERATIONS, '%')
# the controller can return a value here and the worker would receive it
def collectError(self, worker_id, error):
print(round(time.time() - self.startTime, 3), ': Got error', error,
'from worker', worker_id)
charm.start(Controller)
In this example, the Controller will print status updates and errors as they happen. It
will print final results from all workers when they are all done. The result for workers
that have failed will be -1.
The number of processes P is given at launch. The runtime will distribute the N workers among the available processes. This happens when the workers are created and there is no dynamic load balancing in this particular example.
Also, note that in the charm4py model remote method invocation is asynchronous and returns a future which the caller can block on, but only the calling thread blocks (not the whole process).
Hope this helps.
add a comment |
Like someone else said, you have to use multiple processes for true parallelism instead of threads because the GIL limitation prevents threads from running concurrently.
If you want to use the standard multiprocessing library (which is based on launching multiple processes), I suggest using a pool of workers. If I understood correctly, you want to launch 100+ parallel instances. Launching 100+ processes on one host will generate too much overhead. Instead, create a pool of P workers where P is for example the number of cores in your machine and submit the 100+ jobs to the pool. This is simple to do and there are many examples on the web. Also, when you submit jobs to the pool, you can provide a callback function to receive errors. This may be sufficient for your needs (there are examples here).
The Pool in multiprocessing however can't distribute work across multiple hosts (e.g. cluster of machines) last time I looked. So, if you need to do this, or if you need a more flexible communication scheme, like being able to send updates to the controlling process while the workers are running, my suggestion is to use charm4py (note that I am a charm4py developer so this is where I have experience).
With charm4py you could create N workers which are distributed among P processes by the runtime (works across multiple hosts), and the workers can communicate with the controller simply by doing remote method invocation. Here is a small example:
from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
import time
WORKER_ITERATIONS = 100
class Worker(Chare):
def __init__(self, controller):
self.controller = controller
@threaded
def work(self, x, done_future):
result = -1
try:
for i in range(WORKER_ITERATIONS):
if i % 20 == 0:
# send status update to controller
self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
if i == 5 and self.thisIndex[0] % 2 == 0:
# trigger NameError on even-numbered workers
test[3] = 3
time.sleep(0.01)
result = x**2
except Exception as e:
# send error to controller
self.controller.collectError(self.thisIndex, e)
# send result to controller
self.contribute(result, Reducer.gather, done_future)
# This custom map is used to prevent workers from being created on process 0
# (where the controller is). Not strictly needed, but allows more timely
# controller output
class WorkerMap(ArrayMap):
def procNum(self, index):
return (index[0] % (charm.numPes() - 1)) + 1
class Controller(Chare):
def __init__(self, args):
self.startTime = time.time()
done_future = charm.createFuture()
# create 12 workers, which are distributed by charm4py among processes
workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
# start work
for i in range(12):
workers[i].work(i, done_future)
print('Results are', done_future.get()) # wait for result
exit()
def progressUpdate(self, worker_id, current_step):
print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
'progress', current_step * 100 / WORKER_ITERATIONS, '%')
# the controller can return a value here and the worker would receive it
def collectError(self, worker_id, error):
print(round(time.time() - self.startTime, 3), ': Got error', error,
'from worker', worker_id)
charm.start(Controller)
In this example, the Controller will print status updates and errors as they happen. It
will print final results from all workers when they are all done. The result for workers
that have failed will be -1.
The number of processes P is given at launch. The runtime will distribute the N workers among the available processes. This happens when the workers are created and there is no dynamic load balancing in this particular example.
Also, note that in the charm4py model remote method invocation is asynchronous and returns a future which the caller can block on, but only the calling thread blocks (not the whole process).
Hope this helps.
Like someone else said, you have to use multiple processes for true parallelism instead of threads because the GIL limitation prevents threads from running concurrently.
If you want to use the standard multiprocessing library (which is based on launching multiple processes), I suggest using a pool of workers. If I understood correctly, you want to launch 100+ parallel instances. Launching 100+ processes on one host will generate too much overhead. Instead, create a pool of P workers where P is for example the number of cores in your machine and submit the 100+ jobs to the pool. This is simple to do and there are many examples on the web. Also, when you submit jobs to the pool, you can provide a callback function to receive errors. This may be sufficient for your needs (there are examples here).
The Pool in multiprocessing however can't distribute work across multiple hosts (e.g. cluster of machines) last time I looked. So, if you need to do this, or if you need a more flexible communication scheme, like being able to send updates to the controlling process while the workers are running, my suggestion is to use charm4py (note that I am a charm4py developer so this is where I have experience).
With charm4py you could create N workers which are distributed among P processes by the runtime (works across multiple hosts), and the workers can communicate with the controller simply by doing remote method invocation. Here is a small example:
from charm4py import charm, Chare, Group, Array, ArrayMap, Reducer, threaded
import time
WORKER_ITERATIONS = 100
class Worker(Chare):
def __init__(self, controller):
self.controller = controller
@threaded
def work(self, x, done_future):
result = -1
try:
for i in range(WORKER_ITERATIONS):
if i % 20 == 0:
# send status update to controller
self.controller.progressUpdate(self.thisIndex, i, ret=True).get()
if i == 5 and self.thisIndex[0] % 2 == 0:
# trigger NameError on even-numbered workers
test[3] = 3
time.sleep(0.01)
result = x**2
except Exception as e:
# send error to controller
self.controller.collectError(self.thisIndex, e)
# send result to controller
self.contribute(result, Reducer.gather, done_future)
# This custom map is used to prevent workers from being created on process 0
# (where the controller is). Not strictly needed, but allows more timely
# controller output
class WorkerMap(ArrayMap):
def procNum(self, index):
return (index[0] % (charm.numPes() - 1)) + 1
class Controller(Chare):
def __init__(self, args):
self.startTime = time.time()
done_future = charm.createFuture()
# create 12 workers, which are distributed by charm4py among processes
workers = Array(Worker, 12, args=[self.thisProxy], map=Group(WorkerMap))
# start work
for i in range(12):
workers[i].work(i, done_future)
print('Results are', done_future.get()) # wait for result
exit()
def progressUpdate(self, worker_id, current_step):
print(round(time.time() - self.startTime, 3), ': Worker', worker_id,
'progress', current_step * 100 / WORKER_ITERATIONS, '%')
# the controller can return a value here and the worker would receive it
def collectError(self, worker_id, error):
print(round(time.time() - self.startTime, 3), ': Got error', error,
'from worker', worker_id)
charm.start(Controller)
In this example, the Controller will print status updates and errors as they happen. It
will print final results from all workers when they are all done. The result for workers
that have failed will be -1.
The number of processes P is given at launch. The runtime will distribute the N workers among the available processes. This happens when the workers are created and there is no dynamic load balancing in this particular example.
Also, note that in the charm4py model remote method invocation is asynchronous and returns a future which the caller can block on, but only the calling thread blocks (not the whole process).
Hope this helps.
answered Dec 2 '18 at 0:09
Juan GalvezJuan Galvez
11
11
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424535%2frunning-multiple-python-scripts-from-a-single-script-and-communicating-back-and%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown