We have here few use cases:
- Renaming / moving event class
- Modification of existing event (ex. adding new attribute)
In order to support both use case we have done example customizations:
- Renaming / moving event class
In such use case we had to implement custom repository:
module Infra
# Clone of RailsEventStoreActiveRecord::EventRepository
# with changes:
# - constructor is accepting 'events_name_mapping' keyword with hash
# configuration of renaming mapping
# - with modified version of build_event_entity method, which is using this mapping
# - with modified version of read_all_streams_forward method, with support for event_types
# (used in rebuilding of read models, to reduce scope)
class EventRepository < RailsEventStoreActiveRecord::EventRepository
def initialize(events_rename_mapping: {})
@events_rename_mapping = events_rename_mapping
super()
end
# improved version, please take a look documentation on top
def read_all_streams_forward(start_event_id, count, event_types)
stream = adapter
unless start_event_id.equal?(:head)
starting_event = adapter.find_by(event_id: start_event_id)
stream = stream.where('id > ?', starting_event)
end
scope = stream.order('id ASC')
scope = scope.where(event_type: event_types) if event_types
scope = scope.limit(count)
scope.map(&method(:build_event_entity))
end
private
attr_reader :events_rename_mapping
# improved version, please take a look documentation on top
def build_event_entity(record)
return nil unless record
event_type = events_rename_mapping.fetch(record.event_type) { record.event_type }
event_type.constantize.new(
event_id: record.event_id,
metadata: record.metadata,
data: record.data
)
end
end
end
Configure using of this repository by our configuration:
EVENTS_RENAME_MAPPING = {
# Example mapping in case of refactoring (move or rename)
"Loans::Events::LoanGivenEvent" => "Loans::Events::SomeGivenEvent"
}
class CoreConfiguration
def initialize(repository: Infra::EventRepository.new(events_rename_mapping: EVENTS_RENAME_MAPPING),
event_store: RailsEventStore::Client.new(repository: repository),
command_bus: Infra::CommandBus.new,
command_injector: Infra::CommandInjector.new(command_bus: command_bus))
@event_store = event_store
@command_bus = command_bus
@command_injector = command_injector
configure_aggregate_root(event_store)
setup_event_handler_strategy
setup_read_models(event_store)
register_event_handlers(event_store)
register_command_handlers(command_bus)
end
def configure_aggregate_root(event_store)
AggregateRoot.configure do |config|
config.default_event_store = event_store
end
end
...
attr_reader :event_store,
:command_bus,
:all_command_handlers,
:all_read_models,
:all_event_handlers,
:command_injector
end
- modification of existing event (ex. adding new attribute, changing type of existing one, etc)
module Loans
module Events
class LoanGivenEvent < ::Infra::Event
version 4
attribute :loan_number, Loans::Types::LoanNumber
attribute :loan_conditions, Loans::Types::LoanConditions
def self.convert_from_v1_to_v2(event)
puts "\n#{self} convert: v1 -> v2\n"
puts event.inspect
event
end
def self.convert_from_v2_to_v3(event)
puts "\n#{self} convert: v2 -> v3\n"
puts event.inspect
event
end
def self.convert_from_v3_to_v4(event)
puts "\n#{self} convert: v3 -> v4\n"
puts event.inspect
event
end
end
end
end
by default version is equal 1 implicitly:
module Loans
module Events
class LoanGivenEvent < ::Infra::Event
attribute :loan_number, Loans::Types::LoanNumber
attribute :loan_conditions, Loans::Types::LoanConditions
end
end
end
module Infra
class Event < Dry::Struct
include ::Base::Contracts
class VersionConverter
include ::Base::Contracts
SpecEventAttributes = KeywordArgs[event_id: String, data: Hash, metadata: Hash]
Contract SpecEventAttributes, ClassOf[Event] => SpecEventAttributes
def call(event_attributes, klass)
event_version = event_version(event_attributes, klass)
outdated_versions = (event_version..(klass.version - 1)).to_a
outdated_versions.inject(event_attributes) do |attributes, current_version|
upgrade(klass, attributes, from: current_version)
end
end
private
Contract SpecEventAttributes, Any => Integer
def event_version(event_attributes, _klass)
event_attributes.dig(:metadata, :version) || INITIAL_VERSION
end
Contract ClassOf[Event], SpecEventAttributes, KeywordArgs[from: Integer] => SpecEventAttributes
def upgrade(klass, event_attributes, from:)
event_version = from
upgraded_event_version = event_version + 1
new_data =
klass
.method("convert_from_v#{event_version}_to_v#{upgraded_event_version}")
.call(event_attributes[:data])
event_attributes
.merge(data: new_data)
.deep_merge(metadata: { version: upgraded_event_version })
.slice(:data, :metadata, :event_id)
end
end
INITIAL_VERSION = 1
Contract Or[nil, Integer] => Integer
def self.version(version = nil)
if version
@version = version
else
@version || INITIAL_VERSION
end
end
...
end
end