Очередь процесса с многопоточным или задачами
У меня есть приложение для сообщений о телефонии, в котором есть много сообщений, которые нужно обрабатывать. Поскольку телефонные порты ограничены, сообщение сначала будет обработано первым. Каждое сообщение имеет флаг «Подтверждение», который указывает, какая из обработанных. Конечно, он был инициализирован как ложный.
Я хочу поместить все сообщения в очередь, затем обработать их несколькими streamами или задачами.
public class MessageQueue { public Queue MessageWorkItem { get; set; } public Messages Message { get; set; } public MessageQueue() { MessageWorkItem = new Queue(); Message = new Messages(); } public void GetMessageMetaData() { try { // It is just a test, add only one item into the queue Message.MessageID = Guid.NewGuid(); Message.NumberToCall = "1111111111"; Message.FacilityID = "3333"; Message.NumberToDial = "2222222222"; Message.CountryCode = "1"; Message.Acknowledge = false; } catch (Exception ex) { } } public void AddingItemToQueue() { GetMessageMetaData(); if (!Message.Acknowledge) { lock (MessageWorkItem) { MessageWorkItem.Enqueue(Message); } } } } public class Messages { public Guid MessageID { get; set; } public string NumberToCall { get; set; } public string FacilityID { get; set; } public string NumberToDial { get; set; } public string CountryCode { get; set; } public bool Acknowledge { get; set; } }
Теперь мой вопрос заключается в том, как удалить объект из очереди с помощью многопоточности. Для каждого элемента из очереди я хочу запустить скрипт.
- Блокирует Console.WriteLine?
- Как защитить ресурсы, которые могут использоваться в многопоточной или асинхронной среде?
- C # версия синхронизированного ключевого слова java?
- Как дождаться завершения ряда streamов?
- Явная блокировка Java
public void RunScript(Message item) { try { PlayMessage(item); return; } catch (HangupException hex) { Log.WriteWithId("Caller Hungup!", hex.Message); } catch (Exception ex) { Log.WriteException(ex, "Unexpected exception: {0}"); } }
Я думал, что если бы
if (MessageWorkItem.Count> = 1) Затем что-то делать, но мне нужна помощь по коду.
- Реализации Java Queue, какой?
- Зеленые темы против не зеленых нитей
- Как остановить stream, созданный при реализации runnable-интерфейса?
- Как использовать свойство CancellationToken?
- Когда использовать Task.Delay, когда использовать Thread.Sleep?
- Запуск задач в foreach Loop использует значение последнего элемента
- Надуть представление в фоновом streamе
- Сколько streamов слишком много?
Если вы можете использовать .Net 4.5, я бы предложил посмотреть Dataflow из параллельной библиотеки задач (TPL) .
Эта страница приводит к многим примерам пошагового руководства, таким как Практическое руководство . Реализация шаблона streamа данных производителя и потребителя. Пошаговое руководство. Использование streamа данных в приложении Windows Forms .
Посмотрите на эту документацию, чтобы узнать, поможет ли она вам. Это довольно много, но я думаю, что это, вероятно, будет вашим лучшим подходом.
Кроме того, вы можете изучить использование BlockingCollection
вместе с GetConsumingEnumerable()
для доступа к элементам в очереди.
Что вы делаете, так это разделить работу на объекты, которые вы хотите обработать, и использовать BlockingCollection для управления очередью.
Некоторые примеры кода, использующие ints
а не объекты в качестве рабочих элементов, помогут продемонстрировать это:
Когда рабочий stream завершит работу с текущим элементом, он удалит новый элемент из рабочей очереди, обработает этот элемент и добавит его в очередь вывода.
Отдельный stream потребителей удаляет завершенные элементы из очереди вывода и что-то делает с ними.
В конце мы должны дождаться завершения всех рабочих (Task.WaitAll (работники)), прежде чем мы сможем пометить выходную очередь как завершенную (outputQueue.CompleteAdding ()).
using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace Demo { class Program { static void Main(string[] args) { new Program().run(); } void run() { int threadCount = 4; Task[] workers = new Task[threadCount]; Task.Factory.StartNew(consumer); for (int i = 0; i < threadCount; ++i) { int workerId = i; Task task = new Task(() => worker(workerId)); workers[i] = task; task.Start(); } for (int i = 0; i < 100; ++i) { Console.WriteLine("Queueing work item {0}", i); inputQueue.Add(i); Thread.Sleep(50); } Console.WriteLine("Stopping adding."); inputQueue.CompleteAdding(); Task.WaitAll(workers); outputQueue.CompleteAdding(); Console.WriteLine("Done."); Console.ReadLine(); } void worker(int workerId) { Console.WriteLine("Worker {0} is starting.", workerId); foreach (var workItem in inputQueue.GetConsumingEnumerable()) { Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem); Thread.Sleep(100); // Simulate work. outputQueue.Add(workItem); // Output completed item. } Console.WriteLine("Worker {0} is stopping.", workerId); } void consumer() { Console.WriteLine("Consumer is starting."); foreach (var workItem in outputQueue.GetConsumingEnumerable()) { Console.WriteLine("Consumer is using item {0}", workItem); Thread.Sleep(25); } Console.WriteLine("Consumer is finished."); } BlockingCollection inputQueue = new BlockingCollection (); BlockingCollection outputQueue = new BlockingCollection (); } }
Parallel.ForEach от TPL . Это параллель для каждого.
Пример (измененный MessageWorkItem для общей очереди):
public class MessageQueue { public Queue MessageWorkItem { get; set; } public MessageQueue() { MessageWorkItem = new Queue (); } public Message GetMessageMetaData() { try { // It is just a test, add only one item into the queue return new Message() { MessageID = Guid.NewGuid(), NumberToCall = "1111111111", FacilityID = "3333", NumberToDial = "2222222222", CountryCode = "1", Acknowledge = false }; } catch (Exception ex) { return null; } } public void AddingItemToQueue() { var message = GetMessageMetaData(); if (!message.Acknowledge) { lock (MessageWorkItem) { MessageWorkItem.Enqueue(message); } } } } public class Message { public Guid MessageID { get; set; } public string NumberToCall { get; set; } public string FacilityID { get; set; } public string NumberToDial { get; set; } public string CountryCode { get; set; } public bool Acknowledge { get; set; } } class Program { static void Main(string[] args) { MessageQueue me = new MessageQueue(); for (int i = 0; i < 10000; i++) me.AddingItemToQueue(); Console.WriteLine(me.MessageWorkItem.Count); Parallel.ForEach(me.MessageWorkItem, RunScript); } static void RunScript(Message item) { // todo: ... Console.WriteLine(item.MessageID); Thread.Sleep(300); } }