Searching yarn

Twts matching #event
Sort by: Newest, Oldest, Most Relevant
In-reply-to » Progress! so i have moved into working on aggregates. Which are a grouping of events that replayed on an object set the current state of the object. I came up with this little bit of generic wonder.

(cont.)

Just to give some context on some of the components around the code structure.. I wrote this up around an earlier version of aggregate code. This generic bit simplifies things by removing the need of the Crud functions for each aggregate.

Domain Objects

A domain object can be used as an aggregate by adding the event.AggregateRoot struct and finish implementing event.Aggregate. The AggregateRoot implements logic for adding events after they are either Raised by a command or Appended by the eventstore Load or service ApplyFn methods. It also tracks the uncommitted events that are saved using the eventstore Save method.

type User struct {
  Identity string ```json:"identity"`

  CreatedAt time.Time

  event.AggregateRoot
}

// StreamID for the aggregate when stored or loaded from ES.
func (a *User) StreamID() string {
	return "user-" + a.Identity
}
// ApplyEvent to the aggregate state.
func (a *User) ApplyEvent(lis ...event.Event) {
	for _, e := range lis {
		switch e := e.(type) {
		case *UserCreated:
			a.Identity = e.Identity
			a.CreatedAt = e.EventMeta().CreatedDate
        /* ... */
		}
	}
}
Events

Events are applied to the aggregate. They are defined by adding the event.Meta and implementing the getter/setters for event.Event

type UserCreated struct {
	eventMeta event.Meta

	Identity string
}

func (c *UserCreated) EventMeta() (m event.Meta) {
	if c != nil {
		m = c.eventMeta
	}
	return m
}
func (c *UserCreated) SetEventMeta(m event.Meta) {
	if c != nil {
		c.eventMeta = m
	}
}
Reading Events from EventStore

With a domain object that implements the event.Aggregate the event store client can load events and apply them using the Load(ctx, agg) method.

// GetUser populates an user from event store.
func (rw *User) GetUser(ctx context.Context, userID string) (*domain.User, error) {
	user := &domain.User{Identity: userID}

	err := rw.es.Load(ctx, user)
	if err != nil {
		if err != nil {
			if errors.Is(err, eventstore.ErrStreamNotFound) {
				return user, ErrNotFound
			}
			return user, err
		}
		return nil, err
	}
	return user, err
}
OnX Commands

An OnX command will validate the state of the domain object can have the command performed on it. If it can be applied it raises the event using event.Raise() Otherwise it returns an error.

// OnCreate raises an UserCreated event to create the user.
// Note: The handler will check that the user does not already exsist.
func (a *User) OnCreate(identity string) error {
    event.Raise(a, &UserCreated{Identity: identity})
    return nil
}

// OnScored will attempt to score a task.
// If the task is not in a Created state it will fail.
func (a *Task) OnScored(taskID string, score int64, attributes Attributes) error {
	if a.State != TaskStateCreated {
		return fmt.Errorf("task expected created, got %s", a.State)
	}
	event.Raise(a, &TaskScored{TaskID: taskID, Attributes: attributes, Score: score})
	return nil
}
Crud Operations for OnX Commands

The following functions in the aggregate service can be used to perform creation and updating of aggregates. The Update function will ensure the aggregate exists, where the Create is intended for non-existent aggregates. These can probably be combined into one function.

// Create is used when the stream does not yet exist.
func (rw *User) Create(
  ctx context.Context,
  identity string,
  fn func(*domain.User) error,
) (*domain.User, error) {
	session, err := rw.GetUser(ctx, identity)
	if err != nil && !errors.Is(err, ErrNotFound) {
		return nil, err
	}

	if err = fn(session); err != nil {
		return nil, err
	}

	_, err = rw.es.Save(ctx, session)

	return session, err
}

// Update is used when the stream already exists.
func (rw *User) Update(
  ctx context.Context,
  identity string,
  fn func(*domain.User) error,
) (*domain.User, error) {
	session, err := rw.GetUser(ctx, identity)
	if err != nil {
		return nil, err
	}

	if err = fn(session); err != nil {
		return nil, err
	}

	_, err = rw.es.Save(ctx, session)
	return session, err
}

⤋ Read More
In-reply-to » Hi, I am playing with making an event sourcing database. Its super alpha but I thought I would share since others are talking about databases and such.

Progress! so i have moved into working on aggregates. Which are a grouping of events that replayed on an object set the current state of the object. I came up with this little bit of generic wonder.

type PA[T any] interface {
	event.Aggregate
	*T
}

// Create uses fn to create a new aggregate and store in db.
func Create[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) {
	ctx, span := logz.Span(ctx)
	defer span.End()

	agg = new(A)
	agg.SetStreamID(streamID)

	if err = es.Load(ctx, agg); err != nil {
		return
	}

	if err = event.NotExists(agg); err != nil {
		return
	}

	if err = fn(ctx, agg); err != nil {
		return
	}

	var i uint64
	if i, err = es.Save(ctx, agg); err != nil {
		return
	}

	span.AddEvent(fmt.Sprint("wrote events = ", i))

	return
}

fig. 1

This lets me do something like this:

a, err := es.Create(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error {
		return agg.OnUserRegister(nick, key)
})

fig. 2

I can tell the function the type being modified and returned using the function argument that is passed in. pretty cray cray.

⤋ Read More

Hi, I am playing with making an event sourcing database. Its super alpha but I thought I would share since others are talking about databases and such.

It’s super basic. Using tidwall/wal as the disk backing. The first use case I am playing with is an implementation of msgbus. I can post events to it and read them back in reverse order.

I plan to expand it to handle other event sourcing type things like aggregates and projections.

Find it here: sour-is/ev

@prologic@twtxt.net @movq@www.uninformativ.de @lyse@lyse.isobeef.org

⤋ Read More

Apple Event for 18 October 2021, 10:00 PDT, 13:00 EDT begins. Commentary will stream as replies to this twt. I might miss things here and there, as I will also be on a work meeting from 13:00 to 14:00 EDT.

⤋ Read More

Apple’s event on Monday is bringing, as always, speculation to the table. One thing most outlets seem to agree is the introduction of an “M1X” chip, thought Apple might call it differently. M1X might also mean, M1(we don’t know what comes after, or next generation). Either way, I would really like to see the return of the 27” iMac, but I will not hold my breath. Nevertheless, Monday is going to be an exciting day for many, including me! 🍎

⤋ Read More

in some ways, tomo el fuego is an effort to break away from electron and javascript as the basis for application development. specifically p2p messaging, but there are a lot of general applications too. its true that i could have set this concern aside to focus on making the p2p things that i’ve been talking about over the last several years. i might have done something there, but i really don’t want to accept the status quo there. i want a much smaller foundation to build on than a web browser and an event library with V8 bolted on. dynamic 9-flavored p2p is coming, but there’s quite a bit more foundational work to do first.

⤋ Read More