EventStore : Stream, Projections et Souscriptions

EventStore est une base de données orientée événements. Le stockage s’effectue par piles d’événements immutables. Il permet de sauvegarder les événements d’un système de manière optimisée. On peut facilement ajouter un événement à une pile, puis la lire du début à la fin ou inversement très rapidement. Il fournit aussi un mécanisme de souscription, chaque événement ajouté à une pile est transmis à tous les abonnés de celle-ci. C’est donc un outil parfait pour les architectures de type Event Sourcing.
Les binaires fournissent le service de base de données lui-même, ainsi qu’une interface web de gestion. Le service est consommable via http ou TCP/IP. Une API C# est également disponible sur Nuget.
EventStore est conçu pour être “scalable”, on peut facilement configurer un cluster composé de différents nœuds. Nous ne rentrerons pas dans les détails ici.
Il est presque entièrement opensource et gratuit, seule une surcouche de gestion des clusters est payante (on peut tout de même gérer des clusters dans la version gratuite).
Le but de cet article est de découvrir les principales fonctionnalités d’EventStore à travers un exemple. Nous ne nous pencherons pas ici sur l’aspect Event Sourcing qui mériterait à lui seul une série d’article.
Stream
Un stream est une pile d’événements. En général, on range dans un stream unique tous les événements liés à une entité. Par exemple, voici à quoi pourrait ressembler un stream concernant un utilisateur :
Les événements sont immutables. Si des corrections sont nécessaires, on ajoute des événements supplémentaires pour rectifier le tir, et on garde un historique complet de ce qui s’est passé. Dans l’exemple ci-dessus, la suppression de l’utilisateur est une erreur, on ajoute donc un événement de réactivation.
Voici comment ajouter un événement à un stream via l’API C# :
public void AppendEvent(UserCreated userCreated) { var serializedEvent = JsonSerializer.SerializeToString(userCreated).ToUtf8Bytes(); connection.AppendToStreamAsync("user-" + userCreated.UserId, ExpectedVersion.Any, new[] { new EventData(Guid.NewGuid(), typeof (UserCreated).Name, true, serializedEvent, null) }, credentials).Wait(); }
Explication :
On sérialise notre objet d’événement dans un tableau de bytes, puis on pousse dans un stream portant le nom “user-id”. Ici, on ne gère pas la version d’événement (ExpectedVersion.Any) et on ne renseigne pas de meta-données (dernier paramètre du constructeur à null).
Projections
Les projections permettent d’effectuer des requêtes sur les flux d’événements.
Elles sont écrites en Javascript, ce qui donne une grande souplesse dans les traitements que l’on veut effectuer. Pour chaque événement ciblé, le code de la projection est exécuté.
Les projections peuvent être exécutées une fois ou en continu : elles traiteront alors les événements en temps réel.
Dans notre exemple, imaginons que nous voulons remonter la liste des utilisateurs avec leur nom. Nous exécutons la projection suivante (via l’interface d’administration) :
fromAll() .when({ $init: function() { return { count: 0, users : [] }; }, UserCreated: function(state, event){ state.count += 1; state.users.push({id : event.body.UserId , name : event.body.UserName}) }, UserNameChanged: function(state, event){ var targetUser = state.users.filter(function(user) { return user.id === event.body.UserId; })[0]; targetUser.name = event.body.NewUserName; } });
Explication :
fromAll() signifie que nous allons traiter tous les streams de la base courante.
La fonction init est, comme son nom l’indique, l’initialisation de la requête ; nous définissons ici l’objet de résultat qui est un compte du nombre d’utilisateurs et la liste de ces derniers. Le résultat est stocké dans un objet « state ».
On lie ensuite les types d’événements à des fonctions. Ici, UserCreated incrémente le nombre d’utilisateurs et ajoute l’utilisateur dans la liste. UserNameChanged modifie le nom d’un utilisateur déjà présent.
A l’exécution, la projection donne le résultat suivant :
{ "count": 3, "users": [ { "id": 10, "name": "Tata" }, { "id": 11, "name": "Gege" }, { "id": 12, "name": "Jojo" } ] }
EventStore et ReadModels
Même si les projections permettent de requêter à peu près n’importe quoi, elles trouvent leurs limites dans certains cas. Elles seront performantes pour des requêtes de type temporel (« tous les utilisateurs créés dans les 5 derniers jours ») mais pas pour des requêtes de recherche (« tous les utilisateurs dont le nom contient toto »).
Heureusement, EventStore fournit la possibilité de souscrire à des flux d’événements, on peut donc remplir des « read model » dans d’autres bases de données très facilement.
Toujours dans notre exemple, admettons que l’on souhaite effectuer de la recherche full text sur les utilisateurs. Une base de données orientée recherche semble adaptée. Au hasard : ElasticSearch. Avec EventStore, nous pouvons très facilement alimenter une base ElasticSearch au fil de l’eau en souscrivant aux événements de création et de modification d’utilisateurs.
Vous aurez sans doute reconnu le principe du CQRS : séparation de la lecture et de l’écriture. Ici, on écrit dans EventStore, et on lit dans ElasticSearch. On optimise ainsi ces deux aspects de façon indépendante.
Commençons par créer une projection qui ne remontera que les événements qui nous intéressent :
fromAll() .when({ UserCreated: function (state, event) { linkTo('UserProjectionStream', event); }, UserNameChanged: function (state, event) { linkTo('UserProjectionStream', event); } });
Explication :
Nous redirigeons tous les événements de type UserCreated et UserNameChanged vers un nouveau flux UserProjectionStream. Nous ne copions ici qu’une référence à l’événement originel, mais il est possible d’en faire une copie.
Nous obtenons donc notre nouveau stream, avec les événements dans l’ordre chronologique :
Nous devons aussi définir cette projection comme « continuous », ce qui veut dire qu’elle traitera les événements futurs sans s’interrompre. Ainsi, à chaque création ou modification d’utilisateur, le stream UserProjectionStream sera alimenté.
Il ne nous reste plus qu’à souscrire à ce flux pour pouvoir alimenter notre readModel.
Souscriptions
Les souscriptions permettent de s’abonner à des flux d’événements : on reçoit alors tous les événements sauvegardés dans le flux auquel on est abonné.
Il existe trois types de souscriptions :
- Volatile : les événements postérieurs à la souscriptions seront reçus
- CatchUp : on précise une position de départ dans le flux, on peut donc récupérer l’historique des événements du flux.
- Persisted : le serveur gère l’état de la souscription et un événement n’est envoyé qu’à un seul des abonnés.
Reprenons notre exemple, nous allons souscrire au flux UserProjectionStream pour ne recevoir que les événements de type UserCreated et UserNameChanged. Puis créer ou modifier nos documents ElasticSearch (le détail n’est pas montré ici) :
public class UserDenormalizer { private readonly IEventStoreConnection connection; private readonly IElasticSearchRepository elasticSearchRepository; public UserDenormalizer(IElasticSearchRepository elasticSearchRepository, IEventStoreConnection connection) { this.elasticSearchRepository = elasticSearchRepository; this.connection = connection; connection.SubscribeToAllFrom(Position.Start, true, EventAppeared); } private void EventAppeared(EventStoreCatchUpSubscription subscription, ResolvedEvent resolvedEvent) { if (resolvedEvent.Event.EventType.Equals(typeof (UserCreated).Name)) { var userCreatedEvent = JsonSerializer.DeserializeFromString<UserCreated>(Encoding.UTF8.GetString(resolvedEvent.Event.Data)); elasticSearchRepository.CreateUser(userCreatedEvent.UserId, userCreatedEvent.UserName); } else { var userNameChangedEvent = JsonSerializer.DeserializeFromString<UserNameChanged>(Encoding.UTF8.GetString(resolvedEvent.Event.Data)); elasticSearchRepository.UpdateUser(userNameChangedEvent.UserId, userNameChangedEvent.NewUserName); } } }
Explication :
Dans le constructeur, nous souscrivons au flux UserProjectionStream, en partant du premier événement, et en autorisant les événements liés (le stream auquel nous souscrivons est le résultat d’une projection, celui-ci ne contient que des références à des événements existants). Nous passons aussi une méthode qui va recevoir les événements : EventAppeared.
A la réception de l’événement, la méthode détermine s’il faut créer ou modifier un document en fonction du type de l’événement.
On a maintenant un système qui enregistre toutes les actions faites sur les utilisateurs, et qui alimente un index ElasticSearch au fur et à mesure. On peut imaginer que d’autres abonnés viennent se greffer à notre projection afin d’alimenter d’autres bases de données. On obtiendra donc un système capable de lire dans des bases “dénormalisées” (donc rapides), tout en conservant une cohérence en écriture (rien n’est perdu grâce au flux d’événement !).
That’s it !
EventStore est un outil très adapté à l’event sourcing, et les projections et les souscriptions offrent de nombreuses possibilités. Par exemple, développer un chat en s’appuyant sur lui est un jeu d’enfant.
Par nature, il permet de mettre en place des applications très modulaires car l’émission et les traitements des événements sont décorrélés.
Il est capable de gérer de très gros volumes de données, et est facilement “scalable”. On peut donc s’en servir sur de grosses applications.
Vous l’aurez compris, je vous recommande vivement d’y jeter un oeil !