Considerar una cola de la celebración de un mucho de trabajos que se deben procesar. La limitación de la cola se puede conseguir solamente 1 puesto de trabajo en un tiempo y no hay manera de saber cuántos puestos de trabajo que hay. La toma de los trabajos de 10s para completar e incluyen un montón de espera de las respuestas de los servicios web por lo que no es la CPU.

Si puedo usar algo como esto

while (true)
{
   var job = Queue.PopJob();
   if (job == null)
      break;
   Task.Factory.StartNew(job.Execute); 
}

Entonces es rabiosamente pop trabajos de la cola mucho más rápido de lo que se puede completar, ejecutar fuera de la memoria y la caída en su culo. >.<

No los puedo usar (no creo) ParallelOptions.MaxDegreeOfParallelism porque no puedo usar Paralelo.Invocar o en Paralelo.ForEach

3 alternativas que he encontrado

  1. Tarea De Sustitución.De la fábrica.StartNew con

    Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
    task.Start();

    Que se parece un poco a resolver el problema, pero no estoy claro exactamente lo que está haciendo esto y si este es el mejor método.

  2. Crear un programador de tareas personalizado que limita el grado de simultaneidad

  3. Usar algo como BlockingCollection agregar los trabajos de la recolección, cuando se inicia y quitar cuando haya terminado para limitar el número que se puede ejecutar.

Con el #1 yo tengo la confianza en que la decisión correcta es realizado automáticamente, #2/#3 tengo que trabajar el máximo número de tareas que se pueden ejecutar a mí mismo.

He entendido correctamente – que es la mejor manera, o hay otra manera?

EDITAR – Esto es lo que he llegado a partir de las respuestas a continuación, productor-consumidor patrón.

Así como el rendimiento general objetivo no era para quitar de la cola de trabajos más rápido de lo que podría ser procesados y no tener varios hilos de votación de la cola (no se muestra aquí, pero eso es un no-bloqueo de la op y dará lugar a enormes costos de transacción si se encuestó a una alta frecuencia de varios lugares).

//BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
//point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);

//Setup a number of consumer threads.
//Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
//in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
   Thread consumer = new Thread(() =>
   {
      while (!jobs.IsCompleted)
      {
         var job = jobs.Take();
         job.Execute();
      }
   }
   consumer.Start();
}

//Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
    var job = Queue.PopJob();
    if (job != null)
       jobs.Add(job);
    else
    {
       jobs.CompletedAdding()
       //May need to wait for running jobs to finish
       break;
    }
}
  • Así Queue.PopJob() de alguna manera crea los puestos de trabajo «de la nada»? No se pueden sostener en una colección en la memoria, o algo así, ¿verdad?
  • Casi nada – de la ‘nube’ en este caso (Azure cola) a pesar de que yo estaba tratando de mantener la más general que el
  • #2 no se va a limitar el número de la ejecución (y, por tanto, hilos activos) pero va a sorber todo en la cola mucho más rápido de lo que puede procesar.
  • Siempre he querido una Tarea asíncrona Equivalente a su respuesta de subprocesos de ejecución larga. Debido a que la mayoría de las redes de Bibliotecas han Async()1 option which in theory is more efficient than running 8 threads. How does IIS` de la manija de la creación de la tarea en el ASP.NET pipeline. Porque eso es esencialmente lo que estamos tratando de simular?
InformationsquelleAutor Ryan | 2012-06-21

6 Comentarios

  1. 22

    Sólo me dio un respuesta que es muy aplicable a esta pregunta.

    Básicamente, el TPL clase de Tarea se hace para programar la CPU de trabajo. No está hecha para el bloqueo de trabajo.

    Se trabaja con un recurso que no es de la CPU: la espera para el servicio de las respuestas. Esto significa que el TPL se mismange su recurso, porque se supone CPU acotamiento a un cierto grado.

    Gestionar los recursos de ti mismo: el Inicio de un número determinado de hilos o Duradera tareas (que es básicamente el mismo). Decidir sobre el número de hilos empíricamente.

    No se puede poner fiable de los sistemas en producción. Por esa razón, recomiendo el #1, pero estrangulado. No crear tantos hilos como hay elementos de trabajo. Crear tantos hilos que son necesarios para saturar el servicio remoto. Escriba una función auxiliar que genera N hilos y los utiliza para el proceso de M elementos de trabajo. Usted obtener totalmente predecible y fiable de los resultados de esa manera.

    • Así que para confirmar que usted está diciendo usar fija los hilos o la opción #1 de orig Q?
    • La opción 1 es una idea realmente mala. Esto significa la creación de miles de hilos, usted nunca debe hacer nada de eso.
    • Recomiendo #1 pero estrangulado no crear tantos hilos como hay elementos de trabajo. Crear tantos hilos que son necesarios para saturar el servicio remoto. Escriba una función auxiliar que genera N hilos y los utiliza para el proceso de M elementos de trabajo. Usted obtener totalmente predecible y fiable de los resultados de esa manera.
    • Así que la limitación de la cantidad de hilos ‘activo’ en cualquier momento el uso de un método como el #3 (BlockingCollection) o no lo tienen algo más en mente?
    • no está seguro de cómo funcionaría exactamente. Usted no quiere sólo un subproceso de trabajo sobre los elementos de trabajo. Usted también no desea un hilo de iniciar los trabajos tan rápido como puede, ya que desencadena dijo TPL problemas. Cada solución donde no se aseguran de que un número fijo de subprocesos se está ejecutando no es correcto, en mi opinión.
    • Si vas a crear un hilo por cada ejecución de tareas, pero las tareas que se están gastando un montón de tiempo de espera en la red de respuestas, no regalando el robo de trabajo beneficios de los TPL de la biblioteca general ya que, esencialmente, el hilo se van a pasar un montón de tiempo de espera de finalización de e/S y estar inactivo? Perdería mucho de los beneficios del uso de async / await y son igual de buenas recurrir de nuevo a el bloqueo de llamadas de la red en .NETA?
    • su obra no es la CPU. Cualquier optimización de consumo de CPU es discutible, le ayuda a nada. Si tu Cpu no está vinculado a un montón de tiempo que no va a ser el trabajo de robar de todos modos, ya la piscina colas son casi siempre vacía. Async IO generalmente consume más CPU de sincronización IO (difícil de creer pero cierto). En la OPs caso esto es todo acerca de la optimización de IO patrones. Si el IOs se inició sync o async no importa en absoluto para su funcionamiento.

  2. 12

    Potencial de flujo se divide y las continuaciones causada por await, más adelante en el código o en una 3ª parte de la biblioteca, no se jugar muy bien con las tareas de ejecución prolongada (o subprocesos), así que no te molestes en uso de las tareas de ejecución prolongada. En el async/await mundo, son inútiles. Más detalles aquí.

    Puede llamar ThreadPool.SetMaxThreads pero antes de hacer esta llamada, asegúrese de que establece el número mínimo de subprocesos con ThreadPool.SetMinThreads, con valores por debajo o igual a la máxima queridos. Y por cierto, la documentación de MSDN está mal. Usted PUEDE ir a continuación del número de núcleos en su máquina con esas llamadas de método, al menos en .NET 4.5 y 4.6, donde he utilizado esta técnica para reducir la potencia de procesamiento de una memoria limitada de 32 bits de servicio.

    Sin embargo, si usted no desea restringir el conjunto de la aplicación, pero sólo el procesamiento de parte de ella, un programador de tareas personalizado para hacer el trabajo. Hace mucho tiempo, microsoft lanzó la actualización las muestras con varios personalizado programadores de tareas, incluyendo un LimitedConcurrencyLevelTaskScheduler. Spawn la principal tarea de procesamiento manualmente con Task.Factory.StartNew, proporcionando el programador de tareas personalizado, y toda otra tarea generado por la que va a usar, incluyendo async/await e incluso Task.Yield, utiliza para la consecución de asynchronousy temprano en un async método.

    Pero para tu caso en particular, tanto las soluciones no deja de agotar su cola de trabajos antes de su realización. Que puede no ser deseable, dependiendo de la aplicación y el propósito de que la cola de la suya. Son más como «fuego a un montón de tareas y dejar que el programador de encontrar el tiempo para ejecutarlas» tipo de soluciones. Así que tal vez algo un poco más apropiado en este caso podría ser un método más estrictas de control sobre la ejecución de los trabajos a través de semaphores. El código sería este:

    semaphore = new SemaphoreSlim(max_concurrent_jobs);
    
    while(...){
     job = Queue.PopJob();
     semaphore.Wait();
     ProcessJobAsync(job);
    }
    
    async Task ProcessJobAsync(Job job){
     await Task.Yield();
     ... Process the job here...
     semaphore.Release();
    }

    Hay más de una manera para la piel de un gato. El uso de lo que usted cree que es apropiado.

  3. 8

    Microsoft tiene una muy buena biblioteca llamada de flujo de datos que hace exactamente lo que usted desea (y mucho más). Detalles aquí.

    Debe utilizar el ActionBlock clase y la MaxDegreeOfParallelism de la ExecutionDataflowBlockOptions objeto. ActionBlock juega muy bien con async/await, por lo que incluso cuando sus llamadas externas se esperaba, no hay nuevos puestos de trabajo comenzará a procesar.

    ExecutionDataflowBlockOptions actionBlockOptions = new ExecutionDataflowBlockOptions
    {
         MaxDegreeOfParallelism = 10
    };
    
    this.sendToAzureActionBlock = new ActionBlock<List<Item>>(async items => await ProcessItems(items),
                actionBlockOptions);
    ...
    this.sendToAzureActionBlock.Post(itemsToProcess)
    • Alguien ha tenido éxito con esta solución?
    • Yo tenía, que me escribió esto…
    • Esta es la elección correcta en mi opinión. No sólo se puede controlar fácilmente el paralelismo, pero la cadena de acciones se realiza DESPUÉS de que las tareas se han completado (presentación de resultados). Avanzado de modelos tales como «Productor-Consumidor» son muy fáciles de aplicar. Por el problema original, las llamadas a los servicios web pueden devolver resultados que sean objeto de tratamiento de aguas abajo a medida que estén disponibles. Usted puede fácilmente ACELERADOR de que funcione tan bien.
    • Por CIERTO: Usted puede utilizar MaxDegreeOfParallelism y esperan llamadas a SendAsync(x) en lugar de Post(x) en el Bloque que se está ejecutando la función lambda (acciones o funciones que operan sobre los datos o los objetos pasados en). En el extremo de aguas abajo usted puede ReceiveAsync() que devuelve Tarea<T>, y, a continuación, agregue un ContinueWith(Acción<Tarea<T>>).
  4. 7

    El problema aquí no parece haber demasiadas ejecución Tasks, es demasiado muchos programada Tasks. El código se trate de programar como muchos Tasks como se puede, no importa lo rápido que se ejecuta. Y si usted tiene demasiados puestos de trabajo, esto significa que usted conseguirá OOM.

    Debido a esto, ninguna de las soluciones propuestas realmente va a resolver su problema. Si parece que simplemente especificando LongRunning resuelve el problema, entonces es más probable debido a la creación de un nuevo Thread (que es lo que LongRunning hace) toma algo de tiempo, lo que efectivamente limita la obtención de nuevos puestos de trabajo. Así, esta solución sólo funciona por accidente, y lo más probable dar lugar a otros problemas más adelante.

    Con respecto a la solución, que en su mayoría están de acuerdo con usr: la solución más sencilla que funciona razonablemente bien es crear un número fijo de LongRunning tareas y tener un bucle que llama Queue.PopJob() (protegido por un lock si ese método no es thread-safe) y Execute()s el trabajo.

    ACTUALIZACIÓN: Después de algo más de pensar, me di cuenta de que el siguiente intento más probable es que se comportan terriblemente. Utilícelo sólo si estás realmente seguro de que va a funcionar bien para usted.


    Pero el TPL intenta averiguar el mejor grado de paralelismo, incluso para IO-bound Tasks. Así, puede probar a utilizar para su ventaja. Largo Tasks no trabajo aquí, porque desde el punto de vista de los TPL, parece que no se trabaja y comenzará de nuevo Tasks más y más. Lo que usted puede hacer en su lugar es para iniciar una nueva Task al final de cada Task. De esta manera, TPL sabrá lo que está pasando y su algoritmo puede funcionar bien. También, para que la TPL decidir el grado de paralelismo, en el inicio de un Task que es el primero en su línea, iniciar otra línea de Tasks.

    Este algoritmo puede funcionan bien. Pero también es posible que el TPL va a tomar una mala decisión en cuanto al grado de paralelismo, realmente no he intentado nada como esto.

    En el código, tendría este aspecto:

    void ProcessJobs(bool isFirst)
    {
        var job = Queue.PopJob(); //assumes PopJob() is thread-safe
        if (job == null)
            return;
    
        if (isFirst)
            Task.Factory.StartNew(() => ProcessJobs(true));
    
        job.Execute();
    
        Task.Factory.StartNew(() => ProcessJob(false));
    }

    Y comenzar con

    Task.Factory.StartNew(() => ProcessJobs(true));
    • Muchos de sus puntos son correctos. Sólo una cosa desde mi experiencia: El TPL se muy gestionar mal la cantidad de hilos. En ambas direcciones. Esto es demasiado fiable para poner en producción.
    • Gran punto sobre Duradera sólo accidentalmente resolver el problema – es por eso que yo era cauteloso acerca de esto como yo adaptador realmente no entender lo que estaba pasando en el bajo de la campana. Tiene perfecto sentido ahora y de hecho se comportan exactamente como usted la describe – acaba de crear 1500 nuevos hilos de ejecución en un equipo dual core en mi prueba!
    • Sin embargo – su ProcessJobs ejemplo me pone muy nervioso así, ¿quién puede saber cómo se comportan en diferentes circunstancias.
    • Bueno, me gusta la idea detrás de ProcessJobs(): dejar que el TPL encontrar el mejor grado de paralelismo basado en los datos empíricos. Pero sí, antes de implementar algo así en la producción, me volvería a probar mucho. Y es muy posible que no se comportan bien.
    • Hay un error en el código que se va a crear una cantidad infinita de tareas en las 2 primeras líneas.
    • Tienes razón, gracias por darse cuenta de que, fijo.

  5. 1

    TaskCreationOptions.LongRunning es útil para el bloqueo de tareas y el uso que aquí es legítimo. Lo que hace es que se sugiere para el programador de dedicar un hilo a la tarea. El programador de sí misma trata de mantener el número de hilos en el mismo nivel que el número de núcleos de CPU para evitar el excesivo cambio de contexto.

    Es bien descrito en Roscado en C# por José Albahari

    • Eso no es lo que está sucediendo en mis pruebas – partida cientos de personas de los hilos (de doble núcleo de la cpu) en una Tarea con un montón de espera de e/S, e incluso si puedo hacer las Tareas del 100% de la CPU (el primer número calc trabajo inútil sin dormir o bloqueo) sigue a partir de 14 hilos a la vez que no es muy eficiente.
    • Debido a su propio juicio es que el 14 es demasiado, no creo que hay una manera de transmitir la decisión sobre cuántos hilos, es suficiente para cualquier sistema de automatización. Usted debe administrar a sí mismo. Spawn Tareas en un bucle pero el uso de un semáforo para limitar el número de tareas a todo lo que veas oportuno.
  6. 1

    Puedo usar una cola de mensajes/buzón mecanismo para lograr esto. Es parecido al actor modelo. Tengo una clase que tiene un Buzón de correo. Yo llamo a esta clase de mi «trabajo». Se pueden recibir mensajes. Los mensajes están en la cola y que, esencialmente, definir las tareas que quiero que el trabajador ejecute. El trabajador hará uso de la Tarea.Wait() para su Tarea de terminar antes de sacarlo de la cola el siguiente mensaje y empezar la tarea siguiente.

    Limitando el número de trabajadores que tengo, soy capaz de limitar el número de subprocesos simultáneos/tareas que se ejecutan.

    Esto se describe, con el código fuente, en mi blog en un distribuida compute engine. Si miramos el código para IActor y la WorkerNode, espero que tenga sentido.

    https://long2know.com/2016/08/creating-a-distributed-computing-engine-with-the-actor-model-and-net-core/

Dejar respuesta

Please enter your comment!
Please enter your name here