(cont.)
Just to give some context on some of the components around the code structure.. I wrote this up around an earlier version of aggregate code. This generic bit simplifies things by removing the need of the Crud functions for each aggregate.
Domain ObjectsA domain object can be used as an aggregate by adding the event.AggregateRoot
struct and finish implementing event.Aggregate. The AggregateRoot implements logic for adding events after they are either Raised by a command or Appended by the eventstore Load or service ApplyFn methods. It also tracks the uncommitted events that are saved using the eventstore Save method.
type User struct {
Identity string ```json:"identity"`
CreatedAt time.Time
event.AggregateRoot
}
// StreamID for the aggregate when stored or loaded from ES.
func (a *User) StreamID() string {
return "user-" + a.Identity
}
// ApplyEvent to the aggregate state.
func (a *User) ApplyEvent(lis ...event.Event) {
for _, e := range lis {
switch e := e.(type) {
case *UserCreated:
a.Identity = e.Identity
a.CreatedAt = e.EventMeta().CreatedDate
/* ... */
}
}
}
Events
Events are applied to the aggregate. They are defined by adding the event.Meta
and implementing the getter/setters for event.Event
type UserCreated struct {
eventMeta event.Meta
Identity string
}
func (c *UserCreated) EventMeta() (m event.Meta) {
if c != nil {
m = c.eventMeta
}
return m
}
func (c *UserCreated) SetEventMeta(m event.Meta) {
if c != nil {
c.eventMeta = m
}
}
Reading Events from EventStore
With a domain object that implements the event.Aggregate
the event store client can load events and apply them using the Load(ctx, agg)
method.
// GetUser populates an user from event store.
func (rw *User) GetUser(ctx context.Context, userID string) (*domain.User, error) {
user := &domain.User{Identity: userID}
err := rw.es.Load(ctx, user)
if err != nil {
if err != nil {
if errors.Is(err, eventstore.ErrStreamNotFound) {
return user, ErrNotFound
}
return user, err
}
return nil, err
}
return user, err
}
OnX Commands
An OnX command will validate the state of the domain object can have the command performed on it. If it can be applied it raises the event using event.Raise() Otherwise it returns an error.
// OnCreate raises an UserCreated event to create the user.
// Note: The handler will check that the user does not already exsist.
func (a *User) OnCreate(identity string) error {
event.Raise(a, &UserCreated{Identity: identity})
return nil
}
// OnScored will attempt to score a task.
// If the task is not in a Created state it will fail.
func (a *Task) OnScored(taskID string, score int64, attributes Attributes) error {
if a.State != TaskStateCreated {
return fmt.Errorf("task expected created, got %s", a.State)
}
event.Raise(a, &TaskScored{TaskID: taskID, Attributes: attributes, Score: score})
return nil
}
Crud Operations for OnX Commands
The following functions in the aggregate service can be used to perform creation and updating of aggregates. The Update function will ensure the aggregate exists, where the Create is intended for non-existent aggregates. These can probably be combined into one function.
// Create is used when the stream does not yet exist.
func (rw *User) Create(
ctx context.Context,
identity string,
fn func(*domain.User) error,
) (*domain.User, error) {
session, err := rw.GetUser(ctx, identity)
if err != nil && !errors.Is(err, ErrNotFound) {
return nil, err
}
if err = fn(session); err != nil {
return nil, err
}
_, err = rw.es.Save(ctx, session)
return session, err
}
// Update is used when the stream already exists.
func (rw *User) Update(
ctx context.Context,
identity string,
fn func(*domain.User) error,
) (*domain.User, error) {
session, err := rw.GetUser(ctx, identity)
if err != nil {
return nil, err
}
if err = fn(session); err != nil {
return nil, err
}
_, err = rw.es.Save(ctx, session)
return session, err
}
Progress! so i have moved into working on aggregates. Which are a grouping of events that replayed on an object set the current state of the object. I came up with this little bit of generic wonder.
type PA[T any] interface {
event.Aggregate
*T
}
// Create uses fn to create a new aggregate and store in db.
func Create[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) {
ctx, span := logz.Span(ctx)
defer span.End()
agg = new(A)
agg.SetStreamID(streamID)
if err = es.Load(ctx, agg); err != nil {
return
}
if err = event.NotExists(agg); err != nil {
return
}
if err = fn(ctx, agg); err != nil {
return
}
var i uint64
if i, err = es.Save(ctx, agg); err != nil {
return
}
span.AddEvent(fmt.Sprint("wrote events = ", i))
return
}
This lets me do something like this:
a, err := es.Create(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error {
return agg.OnUserRegister(nick, key)
})
I can tell the function the type being modified and returned using the function argument that is passed in. pretty cray cray.
Hi, I am playing with making an event sourcing database. Its super alpha but I thought I would share since others are talking about databases and such.
It’s super basic. Using tidwall/wal as the disk backing. The first use case I am playing with is an implementation of msgbus. I can post events to it and read them back in reverse order.
I plan to expand it to handle other event sourcing type things like aggregates and projections.
Find it here: sour-is/ev
@prologic@twtxt.net @movq@www.uninformativ.de @lyse@lyse.isobeef.org
Health Data
⌘ Read more
#event Upcomming Meetup in Copennhagen: algolab(the_art_of_live_coding) @ Støberiet / Computer Klub
Apple Event for 18 October 2021, 10:00 PDT, 13:00 EDT begins. Commentary will stream as replies to this twt. I might miss things here and there, as I will also be on a work meeting from 13:00 to 14:00 EDT.
Apple Store online down before today’s event. Less than two hours till it goes live!
@prologic@twtxt.net I am thinking on calling in sick to work. 😂 Every time I order an iPhone, I take the day off on delivery day. On Apple events I normally use my lunch and break times all combined, to watch them.
Apple’s event on Monday is bringing, as always, speculation to the table. One thing most outlets seem to agree is the introduction of an “M1X” chip, thought Apple might call it differently. M1X might also mean, M1(we don’t know what comes after, or next generation). Either way, I would really like to see the return of the 27” iMac, but I will not hold my breath. Nevertheless, Monday is going to be an exciting day for many, including me! 🍎
@prologic@twtxt.net Works permit, I will probably be twting about it. I try not to miss one single event.
“Join us for a special event.” Unleashed 😯
#event Tomorrow, Saturday October 2nd, I’m gonna be hosting a workshop at Processing Community Day CPH about Live Coding Visuals in Improviz. Only 5 spots left, so sign up now at: https://pcdcph.com
#event Upcoming Workshop / algolab: Music and Live Coding @ CPH Music Maker Space / facebook event
#event Upcoming Workshop / algolab: Visuals and Live Coding @ CPH Music Maker Space / facebook event
in some ways, tomo el fuego is an effort to break away from electron and javascript as the basis for application development. specifically p2p messaging, but there are a lot of general applications too. its true that i could have set this concern aside to focus on making the p2p things that i’ve been talking about over the last several years. i might have done something there, but i really don’t want to accept the status quo there. i want a much smaller foundation to build on than a web browser and an event library with V8 bolted on. dynamic 9-flavored p2p is coming, but there’s quite a bit more foundational work to do first.
fitness is a lifestyle, not an event
@lucidiot@tilde.town Agreeing that BuJo kind of saved my mind too. It now takes me about three months to fill up 251 pages with tasks, notes and events.