Reaktive Architekturen mit RxJava Teil 2: Programmiermodelle zur asynchronen Datenverarbeitung

Dies ist Teil 2 einer Blog-Serie über reaktive Architekturen mit RxJava. Einführung und Übersicht finden sich hier. Die Grundlagen Reaktiver Systeme wurden in Teil 1: Grundlagen reaktiver Systeme behandelt.

Reaktive Systeme basieren auf einer event-getrieben, asynchronen Datenverarbeitung. Dies ermöglicht eine nachrichtengestützte Kommunikation über System- und Prozessgrenzen und führt zu entkoppelten Komponenten nach dem reaktiven Manifest. Neben einer verbesserten Skalierbarkeit und Fehlertoleranz führt dieser Ansatz zu hoch performanten Anwendungen durch eine asynchrone, push-basierte Datenverarbeitung.

Somit sind für die Programmierung reaktiver Systeme zwei Eigenschaften ausschlaggebend: Asynchronität und Reaktion auf Eventfolgen. Natürlich schließt sich an dieser Stelle direkt die Frage nach dem Programmiermodell in Java und der Gestaltung der Schnittstellen an. Tabelle 1 zeigt die unterschiedlichen Optionen in Abhängigkeit zur Synchronität von Aufrufen und der Anzahl der Ergebnisdatensätze.

Rückgabe eines Datensatzes Rückgabe mehrerer Datensätze
Synchrone Schnittstelle Object Iterable,
Stream
Asynchrone Schnittstelle Callback,
Future,
CompletableFuture
Callback,
Observable

Tabelle 1: Optionen zum Schnittstellendesign

Die Optionen differenzieren sich in den Einsatzgebieten und der Eignung zur Verarbeitung asynchroner Eventfolgen:

1) Synchrone Schnittstelle / Ein Datensatz:

Der einfachste Fall stellt natürlich eine synchrone Schnittstelle dar, bei der ein Methodenaufruf ein Objekt zurückliefert, welches direkt weiterverarbeitet werden kann. Beispiel 1 zeigt ein einfaches Laden und Aktualisieren eines Kundendatensatzes. Die Daten werden sequenziell geladen, verändert und zurückgeschrieben. Da die Verarbeitung auf einem Thread ausgeführt wird, können Fehler über Exceptions abgefangen werden.

try {
 	//lade Kundendaten				
	Customer customer = customerService.findById (customerId);
	//aktualisere Kundendaten
	customer.setAddress( ... );	
	//speichere Kundendaten
	customerService.update(customer);
}	
catch (RuntimeException ex) {
    //verarbeite Fehler
}

Beispiel 1: Synchrone Aktualisierung eines Kundendatensatzes

Das Programmieren mit blockierenden Aufrufen ist sicherlich der einfachste Ansatz und zudem das Standardprogrammiermodell in Java – als Beispiel sei nur JDBC genannt. Asynchronität verlangt hingegen nach anderen Ansätzen.

2) Asynchrone Schnittstelle / Ein Datensatz:

Eine asynchrone Schnittstelle kann mit Callbacks, Futures oder CompletableFutures umgesetzt werden.

a) Asynchrone Schnittstellen mit Callbacks

Die Übergabe von Funktionsreferenzen stellt dank Java 8 die einfachste Möglichkeit zur Umsetzung einer asynchronen Schnittstelle dar. Einer asynchronen Methode wird neben den Parametern noch eine Callback-Funktion zur Verarbeitung des Ergebnisses übergeben.

//lade Kundendaten
vertx.eventBus().send("CustomerService.findById", customerId, reply -> {
	//prüfe auf Fehler
	if (reply.succeeded()) {
		 //Customer aus der Antwort holen
		Customer customer = (Customer) reply.result().body();
		//aktualisere Kundendaten
		customer.setAddress( ... );		
		//Speichere Kundendaten
		vertx.eventBus().send("CustomerService.update", customer, reply -> {
		    //Fehlerprüfung und Ergebinsverarbeitung
		    //nochmals geschachtelt!!!
		});		
	} else {
		//Verarbeite Fehler
	}
});

Beispiel 2: Aktualisierung eines Kundendatensatzes mit Callbacks

Beispiel 2 illustriert die Verwendung asynchroner Schnittstellen zur event-basierten Kommunikation. Das Beispiel nutzt Ver.x, das einen Nachrichtenbus bereitstellt, um mit anderen Komponenten zu kommunizieren.
In Beispiel 2 wird der CustomerService über den Bus aufgerufen. An vertx.eventBus().send(…) wird die Adresse des aufzurufenden Services, die zu übermittelnden Daten und ein Callback übergeben. Die Callback-Funktion wird asynchron aufgerufen, wenn die Antwort des Services über den Nachrichtenbus zurückgesandt wurde. Der Callback prüft im ersten Schritt die Antwort auf eine Fehlersituation und extrahiert die Daten. Dann wird die Adresse verändert und es erfolgt wieder ein Aufruf über den Bus, um die Aktualisierung über den Dienst zu persistieren.
Die Beispiele 1 und 2 setzen dieselbe Logik um. Im Vergleich wird die gestiegene Komplexität durch Schachtelungen deutlich, die die Lesbarkeit des Codes massiv verringern und in einer Callback-Hölle münden.

b) Asynchrone Schnittstellen mit Future / CompletableFuture

Future / CompletableFuture dienen als Platzhalter für einen zukünftigen Wert, der von einer Funktion zurückgegeben wird. Futures sind jedoch inhärent blockierend, wie Beispiel 3 illustriert.

//lade Kundendaten
Future<Customer> customerFuture = customerService.findById(customerId);

//blockierender Aufruf, um die Kundendaten abzurufen
Customer customer = customerFuture.get();

//aktualisere Kundendaten
customer.setAddress( ... );
...

Beispiel 3: Aktualisierung eines Kundendatensatzes mit Futures

Um auf das eigentliche Customer-Objekt zur weiteren Verarbeitung zugreifen zu können, muss dieses über einen blockierenden get-Aufruf abgerufen werden. Damit unterstützt der Einsatz von Futures lediglich die Parallelisierung von Aufrufen in einem blockierenden Verarbeitungsmodell. Eine komplett asynchrone Datenverarbeitung auf Basis von Funktionsketten ist nicht möglich.

Die Problematik wurde in Java 1.8 durch CompletableFuture adressiert. CompletableFuture ermöglicht die Definition von Verarbeitungsketten, um Objekte nach Bereitstellung asynchron zu transformieren und zu verarbeiten.

Allerdings repräsentiert ein CompletableFuture immer noch EINEN Wert/Event, der/das in Zukunft bereitgestellt wird. Da eine Verarbeitung von Eventfolgen nicht möglich ist, eignen sich CompletableFutures daher nicht zur Umsetzung eines Handlers zur Eventverarbeitung. Eine Anbindung eines Dienstes an einen Nachrichtenbus lässt sich damit beispielsweise nicht realisieren.

3) Synchrone Schnittstelle / Mehrere Datensätze:

Die Verarbeitung von multiplen Werten ist in Java seit jeher über Iteratoren möglich. Zudem sind mit Java 8 Streams eingeführt worden. Java 8 Stream bieten eine interessante Möglichkeit zur Definition von Funktionsketten, um Transformationen und Filterungen auf Collections durchzuführen. Allerdings realisieren Streams einen pull-basierten Ansatz auf Werte einer Collection und können nicht mit asynchronen Methodenaufrufen genutzt werden. Zwar ist eine parallelisierte Verarbeitung möglich - dabei werden die Werte einer Collection zur Verarbeitung aber nur auf mehrere Threads verteilt. Eine Kombination von Streams und CompletableFuture ist nicht möglich. Somit sind Streams in asynchronen Szenarien nicht verwendbar.

4) Asynchrone Schnittstelle / Mehrere Datensatze

Sollen mehrere Datensätze asynchron zur reaktiven Programmierung verarbeitet werden – beispielsweise ein Service, der Eventströme über einen Eventbus verarbeitet – dann sind Callbacks die einzige Möglichkeit zur Umsetzung mit Java-Standardmitteln. Wie in Beispiel 2 dargestellt, führen Callbacks jedoch zu schlecht wartbarem Code.
Wünschenswert ist hingegen jedoch eine Lösung, die – ähnlich zu CompletableFutures und Streams – die Definition von funktionalen Verarbeitungsketten ermöglicht und zudem asynchon die Verarbeitung von Eventströmen unterstützt.
Eine Lösung bieten Frameworks wie RxJava, die einen event-basierten Benachrichtigungsmechanismus auf Basis des Observable-Patterns anbieten.

Im nächsten Teil dieser Blog-Serie wird RxJava als Framework zur reaktiven Programmierung vorgestellt.

Weitere Beispiele zur Gestaltung von asynchronen Schnittstellen sind hier im Projekt rxjava-katas in GitHub verfügbar.