Skip to content

Concurrent Programming

baltasarb edited this page Nov 8, 2018 · 17 revisions

Table of Contents

Synchronization with implicit and explicit monitors

1. Implement the Semaphore, SimpleSemaphore using implicit or explicit monitors, with the following public interface:

public class SimpleSemaphore{
    public SimpleSemaphore(int initialUnits);
    public void acquire();
    public void release();
}

The constructor receives the initial available slots initialUnits as an argument. The method acquire should acquire exactly one unit from the semaphore if the number of units is greater than zero. If it cannot acquire it should wait for availability to do so.

The method release should make one unit available to the semaphore allowing for another successfull call of the acquire method. This method is non-blocking.

Solution

Tests

2. Implement in either C# or Java, using implicit or explicit monitors, the synchronizer keyed exchanger, that haves the following public interface:

public class KeyedExchanger<T> {   
    public Optional<T> exchange(int ky, T mydata, int timeout) throws InterruptedException; 
} 

The synchronizer supports information exchange between pairs of threads identified by a key. The threads that use this synchronizer manifest their availability to begin an exchange by calling the method exchange, specifying the pair identificator key, the object they they wish to give to the partner thread myData and optionally the maximum time they are willing to wait for the exchange to succeed timeout.

The method exchange terminates:

  • returning an Optional with a value, when a successfull exchange with another thread occurrs, being the object offered returned in the value of the Optional.
  • return an empty Optional, if the waiting time is exceeded.
  • throwing a ThreadInterruptedException when the wait is interrupted.

Solution
Exhanger Tests
KeyedExchanger Tests

3. Implement in either C# or Java the class EventBus with the follinw public interface:

public class EventBus {   
    public EventBus(int maxPending);   
    public void SubscribeEvent<T>(Action<T> handler) where T: class;   
    public void PublishEvent<E>(E message) where E: class;   
    public void Shutdown(); 
} 

This class provides a mecanism so that system events (e.g., user registered sucessfully, database connection failed) are published to every subscriber interested in that type of event. For instance, the system can have a component subscribed in the event of a successfull registration of a user to send him a welcome email.
The publication of an event consists in an object being sent, called message, to every subscriber registered to that type of object.

The method SubscriberEvent registers a handler to be executed when a message of type T is published, being the handler executed by the thread that made the respective handler registration with the SubscribeEvent method. Calling this method blocks execution, returning only after a shutdown or a thread interruption.

The method PublishEvent, which never blocks the calling thread, sends the specified message to the bus so that it is processed by every handler registered until that moment, whether they are or not processing another message. In case there are more than maxPending messages peing processed by the same handler the PublishEvent method should discard that event for that specific handler. SubscribeEvent should throw a ThreadInterruptedException in case the calling thread is interrupted while waiting or during the handler execution. After calling the method Shutdown, calls to PublishEvent should throw InvalidOperationException and all calls to SubscriberEvent should return after processing of all published messages. The Shutdown method should block the calling thread until the shutdown process is completed, that is, the processing of all messages given to the bus in finalized.

Solution
Tests

4. Implement in either C# or Java, using implicit or explicit monitors, the synchronizer message queue that haves the folling public interface in Java:

public class MessageQueue<T> {   
    public SendStatus send(T sentMsg);   
    public Optional<T> receive(int timeout) throws InterruptedException; 
} 
public interface SendStatus { 
    boolean isSent();   boolean tryCancel();   
    boolean await(int timeout) throws InterruptedException; 
} 

This synchronizer allows communication between producer threads and consumer threads.
The operation send delivers a message to the queue sentMsg, and terminates imediatly, returning an object that implements the interface SendStatus. This object allows for the synchronization with the delivery of the respective message to another thread.
The isSent method returns:

  • true if the message was delivered to another thread.
  • false otherwise.

The method await synchronizes with the message delivery:

  • returns true when the message is received by a consumer thread.
  • returns false if the waiting time is exceeded.
  • throwing an InterruptedException if the thread is interrupted while blocked.

The method tryCancel tries to remove the message from the queue, returning the success of that removal (it may no longer be possible).
The method receive allows for the receival of a message from the queue and terminates:

  • returns an Optional with the message, in case of success.
  • returns an empty Optional if the waiting time is exceeded.
  • throwing an InterruptedException if the thread is interrupted while blocked.

The implementation of this synchronizer should optimize the number of thread commutations that occur under different circumstances.

Solution
Tests

5. Implement in either C# or Java, using implicit or explicit monitors, the synchronizer simple thread pool executor that executes the commands that it receives in one of the threads it creates and manages for that effect. The public interface for the synchronizer is the following:

public class SimpleThreadPoolExecutor {   
    public SimpleThreadPoolExecutor(int maxPoolSize, int keepAliveTime);   
    public boolean execute(Runnable command, int timeout) throws InterruptedException;   
    public void shutdown();   
    public boolean awaitTermination(int timeout) throws InterruptedException; 
} 

The maximum number of worker threads maxPoolSize and the maximum time that a thread may be inactive before terminating keepAliveTime are passed as arguments to the constructor of the class SimpleThreadPoolExecutor. The management, by the synchronizer, of the worker threads must adhere the following criteria:

  • if the number of worker threads is inferior to the specified maximum, it is created a new worker thread upon work submition, as long as there is no worker thread available to handle the work.
  • the worker threads must terminate after keepTimeAlive milliseconds are passed without receiving any command.
  • the number of worker threads existing in the pool may vary between zero and maxPoolSize.

The threads that pretend to execute functions through the ThreadPoolExecutor invoke the execute method, specifying the command to be executed. This method may block the calling thread because it has to guaranteed that the specified command is delivered to a working thread to execution and may terminate:

  • normally, returning true if the command was delivered to execution.
  • throwing a RejectedOperationException if the pool is in shutting down mode.
  • returning false if the waiting time specified as timeout is passed with the work not being given to a thread.
  • throwing an InterruptedException if the calling thread is interrupted.

Calling the method shutdown sets the pool state as shutting down and returns immediatly. In this mode, every call to the method execute should throw a RejectedOperationException. Every sumbition made before the method is called should be handled normally.

The method awaitTermination allows the calling thread to synchronize with the conclusion of the shutdown process of the executor, that is, until all the pending commands are accepted and all the active worker threads terminate:

  • normally, returning true.
  • returning false, on waiting time exceeded without the shutting down process concluded.
  • throwing an InterrutpedException if the waiting is interrupted.

The implementation of this synchronizer should optimize the number of thread commutations that occur under different circumstances.

Solution
Tests

Semi Lock Free Synchronization

Lock Free Synchronization