Рубрики
Без рубрики

Примеры очереди блокировки Java

– Примеры очереди блокировки Java

В Java мы можем использовать BlockingQueue для создания очереди, которая является общей как для производителя, так и для потребителя.

  1. Производитель – Генерирует данные и помещает их в очередь.
  2. Потребитель – Удалите данные из очереди.

Реализации BlockingQueue являются потокобезопасными, их можно безопасно использовать с несколькими производителями и несколькими потребителями.

1. Блокирующая очередь

Простой Блокирующая очередь пример, производитель генерирует данные и помещает их в очередь, в то же время потребитель берет данные из той же очереди.

1.1 Производитель – A Запускаемый объект для помещения 20 целых чисел в очередь.

package com.mkyong.concurrency.queue.simple.raw;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private final BlockingQueue queue;

    @Override
    public void run() {

        try {
            process();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }

    private void process() throws InterruptedException {

        // Put 20 ints into Queue
        for (int i = 0; i < 20; i++) {
            System.out.println("[Producer] Put : " + i);
            queue.put(i);
            System.out.println("[Producer] Queue remainingCapacity : " + queue.remainingCapacity());
            Thread.sleep(100);
        }

    }

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }
}

1.2 Потребитель – А Запускаемый объект для извлечения элементов из очереди.

package com.mkyong.concurrency.queue.simple.raw;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private final BlockingQueue queue;

    @Override
    public void run() {

        try {
            while (true) {
                Integer take = queue.take();
                process(take);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }

    private void process(Integer take) throws InterruptedException {
        System.out.println("[Consumer] Take : " + take);
        Thread.sleep(500);
    }

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }
}

1.3 Запустите его. Запустите 1 производителя и 1 потребителя и создайте очередь размером 10.

package com.mkyong.concurrency.queue.simple;

import com.mkyong.concurrency.queue.simple.raw.Consumer;
import com.mkyong.concurrency.queue.simple.raw.Producer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

    public static void main(String[] args) {

        BlockingQueue queue = new LinkedBlockingQueue<>(10);

        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();

    }

}

Выход

Производитель не будет пытаться поместить больше данных в очередь, если она заполнена.

[Producer] Put : 0
[Producer] Queue remainingCapacity : 9
[Consumer] Take : 0
[Producer] Put : 1
[Producer] Queue remainingCapacity : 9
[Producer] Put : 2
[Producer] Queue remainingCapacity : 8
[Producer] Put : 3
[Producer] Queue remainingCapacity : 7
[Producer] Put : 4
[Producer] Queue remainingCapacity : 6
[Producer] Put : 5
[Producer] Queue remainingCapacity : 5
[Consumer] Take : 1
[Producer] Put : 6
[Producer] Queue remainingCapacity : 5
[Producer] Put : 7
[Producer] Queue remainingCapacity : 4
[Producer] Put : 8
[Producer] Queue remainingCapacity : 3
[Producer] Put : 9
[Producer] Queue remainingCapacity : 2
[Producer] Put : 10
[Producer] Queue remainingCapacity : 1
[Consumer] Take : 2
[Producer] Put : 11
[Producer] Queue remainingCapacity : 1
[Producer] Put : 12
[Producer] Queue remainingCapacity : 0
[Producer] Put : 13
[Consumer] Take : 3
[Producer] Queue remainingCapacity : 0
[Producer] Put : 14
[Consumer] Take : 4
[Producer] Queue remainingCapacity : 0
[Producer] Put : 15
[Consumer] Take : 5
[Producer] Queue remainingCapacity : 0
[Producer] Put : 16
[Consumer] Take : 6
[Producer] Queue remainingCapacity : 0
[Producer] Put : 17
[Consumer] Take : 7
[Producer] Queue remainingCapacity : 0
[Producer] Put : 18
[Consumer] Take : 8
[Producer] Queue remainingCapacity : 0
[Producer] Put : 19
[Consumer] Take : 9
[Producer] Queue remainingCapacity : 0
[Consumer] Take : 10
[Consumer] Take : 11
[Consumer] Take : 12
[Consumer] Take : 13
[Consumer] Take : 14
[Consumer] Take : 15
[Consumer] Take : 16
[Consumer] Take : 17
[Consumer] Take : 18
[Consumer] Take : 19

Программа не остановится и не выйдет, она будет продолжать работать там, чтобы помещать и брать данные из BlockingQueue

2. Блокирующая очередь + Ядовитая таблетка

“Ядовитая таблетка” – это общее решение для остановки или прерывания потоков как производителей, так и потребителей. Идея заключается в том, что производитель помещает “ядовитую таблетку” в очередь и выходит, если “потребитель” видит “ядовитую таблетку”, затем останавливается и выходит.

2.1 Производитель с раствором ядовитой таблетки.

package com.mkyong.concurrency.queue.simple.poison;

import java.util.concurrent.BlockingQueue;

public class ProducerPoison implements Runnable {

    private final BlockingQueue queue;
    private final Integer POISON;

    @Override
    public void run() {

        try {
            process();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            while (true) {
                try {
                    queue.put(POISON);
                    break;
                } catch (InterruptedException e) {
                    //...
                }
            }
        }

    }

    private void process() throws InterruptedException {

        // Put 20 elements into Queue
        for (int i = 0; i < 20; i++) {
            System.out.println("[Producer] Put : " + i);
            queue.put(i);
            System.out.println("[Producer] Queue remainingCapacity : " + queue.remainingCapacity());
            Thread.sleep(100);
        }

    }

    public ProducerPoison(BlockingQueue queue, Integer POISON) {
        this.queue = queue;
        this.POISON = POISON;
    }

}

2.2 Потребитель с раствором ядовитой таблетки.

package com.mkyong.concurrency.queue.simple.poison;

import java.util.concurrent.BlockingQueue;

public class ConsumerPoison implements Runnable {

    private final BlockingQueue queue;
    private final Integer POISON;

    @Override
    public void run() {

        try {
            while (true) {
                Integer take = queue.take();
                process(take);

                // if this is a poison pill, break, exit
                if (take == POISON) {
                    break;
                }

            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }

    private void process(Integer take) throws InterruptedException {
        System.out.println("[Consumer] Take : " + take);
        Thread.sleep(500);
    }

    public ConsumerPoison(BlockingQueue queue, Integer POISON) {
        this.queue = queue;
        this.POISON = POISON;
    }
}

2.3 Начните с 2 производителей и 2 потребителей.

package com.mkyong.concurrency.queue.simple;

import com.mkyong.concurrency.queue.simple.poison.ConsumerPoison;
import com.mkyong.concurrency.queue.simple.poison.ProducerPoison;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

    public static void main(String[] args) {

        BlockingQueue queue = new LinkedBlockingQueue<>(10);

        //new Thread(new Producer(queue)).start();
        //new Thread(new Consumer(queue)).start();

        Integer poison = -1;
        new Thread(new ProducerPoison(queue, poison)).start();
        new Thread(new ProducerPoison(queue, poison)).start();

        new Thread(new ConsumerPoison(queue, poison)).start();
        new Thread(new ConsumerPoison(queue, poison)).start();

    }

}

Выход

[Producer] Put : 0
[Producer] Put : 0
[//...
[Consumer] Take : 18
[Consumer] Take : 18
[Consumer] Take : 19
[Consumer] Take : 19
[Consumer] Take : -1
[Consumer] Take : -1

Process finished with exit code 0

3. Блокировка очереди + Индексирование файлов

A Блокирующая очередь пример создания простого механизма индексирования файлов. Производитель обходит каталог и помещает имя файла в очередь, в то же время потребитель берет имя файла из той же очереди и индексирует его.

3.1 Производитель.

package com.mkyong.concurrency.queue.crawler;

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;

// Producer
// Crawl file system and put the filename in BlockingQueue.
public class FileCrawlerProducer implements Runnable {

    private final BlockingQueue fileQueue;
    private final FileFilter fileFilter;
    private final File file;
    private final File POISON;
    private final int N_POISON_PILL_PER_PRODUCER;

    @Override
    public void run() {

        try {
            crawl(file);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            while (true) {
                try {
                    System.out.println(Thread.currentThread().getName()
                            + " - FileCrawlerProducer is done, try poison all the consumers!");
                    // poison all threads
                    for (int i = 0; i < N_POISON_PILL_PER_PRODUCER; i++) {
                        System.out.println(Thread.currentThread().getName() + " - puts poison pill!");
                        fileQueue.put(POISON);
                    }
                    break;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public FileCrawlerProducer(BlockingQueue fileQueue,
                               FileFilter fileFilter, File file, 
                               File POISON, int n_POISON_PILL_PER_PRODUCER) {
        this.fileQueue = fileQueue;
        this.fileFilter = fileFilter;
        this.file = file;
        this.POISON = POISON;
        N_POISON_PILL_PER_PRODUCER = n_POISON_PILL_PER_PRODUCER;
    }

    private void crawl(File root) throws InterruptedException {

        File[] entries = root.listFiles(fileFilter);
        if (entries != null) {
            for (File entry : entries) {
                if (entry.isDirectory()) {
                    crawl(entry);
                } else if (!isIndexed(entry)) {
                    System.out.println("[FileCrawlerProducer] - Found..." 
                            + entry.getAbsoluteFile());
                    fileQueue.put(entry);
                }
            }
        }

    }

    private boolean isIndexed(File f) {
        return false;
    }

}

3.2 Потребитель.

package com.mkyong.concurrency.queue.crawler;

import java.io.File;
import java.util.concurrent.BlockingQueue;

// Consumer
public class IndexerConsumer implements Runnable {

    private final BlockingQueue fileQueue;
    private final File POISON;

    @Override
    public void run() {

        try {
            while (true) {
                File take = fileQueue.take();
                if (take == POISON) {
                    System.out.println(Thread.currentThread().getName() + " die");
                    break;
                }
                indexFile(take);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

    }

    public void indexFile(File file) {
        if (file.isFile()) {
            System.out.println(Thread.currentThread().getName() 
                    + " [IndexerConsumer] - Indexing..." + file.getAbsoluteFile());
        }

    }

    public IndexerConsumer(BlockingQueue fileQueue, File POISON) {
        this.fileQueue = fileQueue;
        this.POISON = POISON;
    }
}

3.3 Начните с 1 производителя и 2 потребителей.

package com.mkyong.concurrency.queue.crawler;

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

    private static final File POISON = new File("This is a POISON PILL");

    public static void main(String[] args) {

        int N_PRODUCERS = 1;
        int N_CONSUMERS = 2;//Runtime.getRuntime().availableProcessors();
        int N_POISON_PILL_PER_PRODUCER = N_CONSUMERS / N_PRODUCERS;
        int N_POISON_PILL_REMAIN = N_CONSUMERS % N_PRODUCERS;

        System.out.println("N_PRODUCERS : " + N_PRODUCERS);
        System.out.println("N_CONSUMERS : " + N_CONSUMERS);
        System.out.println("N_POISON_PILL_PER_PRODUCER : " + N_POISON_PILL_PER_PRODUCER);
        System.out.println("N_POISON_PILL_REMAIN : " + N_POISON_PILL_REMAIN);

        //unbound queue, no limit
        BlockingQueue queue = new LinkedBlockingQueue<>();

        FileFilter filter = new FileFilter() {
            public boolean accept(File file) {
                return true;
            }
        };

        File root = new File("C:\\users");

        for (int i = 0; i < N_PRODUCERS - 1; i++) {
            new Thread(new FileCrawlerProducer(queue, filter, root,
                    POISON, N_POISON_PILL_PER_PRODUCER)).start();
        }
        new Thread(new FileCrawlerProducer(queue, filter, root, POISON,
                N_POISON_PILL_PER_PRODUCER + N_POISON_PILL_REMAIN)).start();

        for (int i = 0; i < N_CONSUMERS; i++) {
            new Thread(new IndexerConsumer(queue, POISON)).start();
        }

    }
}

Выход

//...
[FileCrawlerProducer] - Found...C:\users\Public\Videos\desktop.ini
Thread-2 [IndexerConsumer] - Indexing...C:\users\Public\Videos\desktop.ini
Thread-0 - FileCrawlerProducer is done, try poison all the consumers!
Thread-0 - puts poison pill!
Thread-0 - puts poison pill!
Thread-1 die
Thread-2 die

Process finished with exit code 0

Скачать Исходный Код

Рекомендации

  1. Проблема производителя–потребителя
  2. Блокирующая очередь javadoc

Оригинал: “https://mkyong.com/java/java-blockingqueue-examples/”