-
Notifications
You must be signed in to change notification settings - Fork 1
XDB Proposal & Design
This is the design and proposal doc of XDB project. It will starts with the goal of the project, then the high level design with tech stacks, and how all the components work together. Then a detailed design section with example.
The doc will make quotes from the article "The Modern Transactional Stack" from Yoko Li, Martin Casado, and Satish Talluri for several times.
- 09/24/2023: changed
objectId
toglobalAttributePkValue
to be more clear for what it is - 10/04/2023: changed to implement local queue in XDB by default, and use Pulsar as optional
XDB is not a new database. It's "eXtending a database with an async programming model".
This means nothing will be dropped from the database that it extended from -- all the existing functionalities of a database will be kept. With XDB, a database will provide both synchronous and asynchronous programming experience.
Internally, XDB utilize database CDC(change data capture) + message queue as implementation, which is a popular design pattern for async programming in the industry. Therefore, from another point of view, XDB creates an abstraction of this design pattern. So that users don't need to implement this pattern themselves(which requires to deal with a lot of low level details of a database).
XDB is to extending any database into an application logic transactional platform (ALTP), without creating new database, defined in the article "The Modern Transactional Stack".
As of today, there are many excellent database products -- from SQL to NoSQL, and to even "NewSQL". They all provide "synchronous" APIs for applications. For example, a request is made from application to insert a record/row/document, then a database will process and respond back the result synchronously.
This is great but far not enough. The real-world applications often have to deal with "asynchronous process". For example in e-commerce, placing an order means inserting some database records first, then call external service for sending emails, reminders, or tracking order status in the background.
Those operations after inserting the database records are "asynchronous":
- There is no builtin/easy way to atomically inserting database records and also call external services
- Communicating with other services often need backoff retry for resiliency (eventual consistency)
- It may need to wait for some duration (like sending reminder often need to use a "durable timer")
- The operations need to be orchestrated with some sequence based on business need
Additionally, those async operations are usually coupling with databases, because databases are usually the "source of truth" for read/write data. For example, we may want to update a record if user click the link in a reminder email, or update the record when the order status moved from "shipped" to "delivered".
Today to build an reliable and scalable async process:
- People put queues to represent different steps, and using consumer workers to executing the steps asynchronously
- People build their own "durable timer services" to persist a timer, or use cron job to run periodically to hack/work around
- People use "workflow" services or programming model like "Step Functions/Conductor/iWF" to orchestrate the API for databases/services
- People use database CDC(change data capture) + message queue to build async logic after database changes
Those solutions are complicated:
- Business logic are scattered across different places, hard to maintain
- Have to deal with a lot of low level APIs(like the CDC message schema)
- May require a lot of work to sync/transport data between different services and database
The problem is stated in a different way in "The Modern Transactional Stack":
In fact, the transactional database (generally called OLTP — short for online transaction processing — database) has been so central to application development that, over time, it consumed more and more application functionality. However, microservices and other modern application architectures introduced new complexities into application design: Developers needed to manage data across different services and ensure consistency between them, which forced them to build complex data synchronization and processing mechanisms in-house.
And
LAMP on a database wasn’t sufficient for scale. And a giant hairball of queues and retry logic is too brittle. To deal with this, we’ve seen, over the last few years, the emergence of new solutions that bring sanity back to transactional logic. They can be roughly categorized as either workflow-centric approaches or database-centric approaches.
What if a database and an async programming framework become the same service? Then
- Users will have an async framework to build reliable process
- No need to sync/transport data between databases and others
The iWF programming model is a perfect fit for this idea. It could work seamlessly with any database.
Here is a quick introduction to iWF(Indeed workflow engine)'s async programming model if you are not familiar with it. IWF(Indeed Workflow Engine) is production proved workflow engine that is powerful to build complicated use cases, easy to learn and use. It provides both a programming framework(via SDKs) and a service.
In iWF framework, users will define a "persistence schema" for storing custom data as "data attributes".
The persistence is accessible for other two basic components of an iWF workflow: "WorkflowState" and "RPC". User will define business logic using code in WorkflowState and RPC. They will read and write the data of the persistence.
There is a Python example(just because Python is shorter. Java and Golang SDKs are also available in iWF, and any language can easily implement iWF protocol):
class SubmitState(WorkflowState[Form]):
def execute(self, ctx: WorkflowContext, input: Form, command_results: CommandResults, persistence: Persistence,
communication: Communication,
) -> StateDecision:
persistence.set_data_attribute(data_attribute_form, input)
persistence.set_data_attribute(data_attribute_status, "waiting")
print(f"API to send verification email to {input.email}")
return StateDecision.single_next_state(VerifyState)
class VerifyState(WorkflowState[None]):
def wait_until(self, ctx: WorkflowContext, input: T, persistence: Persistence, communication: Communication,
...
def execute(self, ctx: WorkflowContext, input: T, command_results: CommandResults, persistence: Persistence,
...
class UserSignupWorkflow(ObjectWorkflow):
def get_workflow_states(self) -> StateSchema:
return StateSchema.with_starting_state(SubmitState(), VerifyState())
def get_persistence_schema(self) -> PersistenceSchema:
return PersistenceSchema.create(
PersistenceField.data_attribute_def(data_attribute_form, Form),
PersistenceField.data_attribute_def(data_attribute_status, str),
PersistenceField.data_attribute_def(data_attribute_verified_source, str),
)
def get_communication_schema(self) -> CommunicationSchema:
...
@rpc()
def verify(
self, source: str, persistence: Persistence, communication: Communication
) -> str:
status = persistence.get_data_attribute(data_attribute_status)
if status == "verified":
return "already verified"
persistence.set_data_attribute(data_attribute_status, "verified")
persistence.set_data_attribute(data_attribute_verified_source, source)
communication.publish_to_internal_channel(verify_channel)
return "done"
The iWF programming model is really simple to understand, here is a brief summary:
- WorkflowState is the basic unit of a async process(called workflow). It contains two methods to implement by user code: "waitUntil" and "execute".
- "waitUntil" is invoked by iWF server firstly if implemented, to return some commands for server to wait for. Once commands are completed,
- "execute" is invoked to return a StateDecision.
- The commands can be TimerCommand (as durable timer), or ChannelCommand(wait for some data), or a combination of them.
- StateDecision will be going to next states, completing/failing workflow, etc.
- RPC is for client to interact with the workflow after it started. Also implemented by user code, and invoked by server.
- Persistence is the data that can be accessed in the above WorkflowState/RPC programming.
You can read more details in the iWF wiki.
The "data attributes" in iWF is stored in iWF separately from users' database. This could not work well in application:
- Data are deleted after the workflow execution is completed(with some retention)
- The persistence just like a simple KV storage which cannot be used as a real database. It is not comparable with modern database. It missing index, aggregation, view, multi-row transactions, ... see more details in iWF wiki about pros and cons of the persistence.
- It's tricky and complicated to integrate with other database, which often will be used as "source of truth" for business.
As stated in "The Modern Transactional Stack":
- Business data (“data”) refers to the business-critical data traditionally stored in an OLTP database for persistence and processing (e.g. user profile info such as name, address, credit score, etc.).
- Application state refers to the current state of the system; the application state is determined by a value stored in a data storage system and which step the program’s execution is on in a finite state machine (e.g. the state of an order, such as “order received,” “inventory checked,” “credit checked,” “shipped,” “returned”).
- Business logic refers to the part of the program that deals with how the application actually works or what it does, instead of execution details (e.g. “If user_income > $100K & credit_score >650 ⇒ mortgage_approved = TRUE”).
workflow engines work primarily on application state rather than the business data, and often require some complexity when integrating with traditional databases.
It would be much nicer if the persistence in iWF programming model can be mapped directly to a "real database", rather than having it as a separate storage outside of database. All the read/write in the programming model is just operating on the database that users wanted to sync to when using workflow engines.
Based on the above example, what if user will define the PersistenceSchema like this with additional information to associate with their database?:
class UserSignupWorkflow(ObjectWorkflow):
def get_workflow_states(self) -> StateSchema:
...
def get_persistence_schema(self) -> PersistenceSchema:
return PersistenceSchema.create(
table_name = "user",
primary_key = "id", primary_key_type = String,
PersistenceField.data_attribute_def(col_name=data_attribute_form, Form),
PersistenceField.data_attribute_def(col_name=data_attribute_status, str),
PersistenceField.data_attribute_def(col_name=data_attribute_verified_source, str),
)
...
While they have a real database(like MySQL/Postgres) table user
:
Column | Type | Description |
---|---|---|
user_id(PK) | string | the user id |
status | string | "verified" or "waiting" |
form | json | other static info from submission like email address |
source | string | how does the user verify the account |
These mean that that the same workflow code will be operating directly on the database:
- Each workflow execution can be mapped to a row of table "user"
- Attributes "data_attribute_form", "data_attribute_status" and "data_attribute_verified_source" are mapped to the columns of the table
Then the rest of the code is mostly the same, but just operating on the database directly:
- Any
persistence.get_data_attribute(...)
is reading from the table row - Any
persistence.set_data_attribute(...)
is writing into the table row - The persistence read/write in a WorkflowState/RPC can be atomic if using locking for persistence policy
Interestingly, this idea seems to match with the prediction in "The Modern Transactional Stack":
The other way is to tie the workflow engine to a database that is aware of the workflow transactional semantics. This hasn’t quite happened yet, but it’s not hard to believe it will.
To wrap up the goal of XDB, these are the concepts that XDB will implement and exposed to users.
-
XDB application: user application of XDB
- worker: part of the XDB application. The implementation of process using the XDB programming models, using XDB SDKs. It exposes REST APIs to be called by XDB service, which works as "worker" of the process.
- client: another part of the application that needs to interact with the process: start, stop, waitForCompletion, getResults, invoking RPC etc
- XDB service: the service of XDB as an extension of the user database
- Database: a regular OLTP database that XDB will be extended from, to provide this XDB programming model
Within the programming model:
-
Process: The top level concept of the programming model(just like "Workflow" in iWF or other workflow engines). It contains all other elements including business data and logic. All is defined as code using developers' native programming languages.
- ProcessExecution: an execution of a process.
-
Persistence: the data that will be accessible during the execution of the process, by AsyncState and RPC.
- LocalAttribute: the data that only lives during the lifetime of a single ProcessExecution.
- GlobalAttribute: the data that lives over the lifetime of a ProcessExecution, and shared across and multiple ProcessExecutions
-
Communication:
- LocalQueue: a message queue that will be accessed during the execution of the process
- GlobalQueue: a message queue that lives over the lifetime of a ProcessExecution, and shared across and multiple ProcessExecutions
-
AsyncState: A business logic unit to be invoked by XDB server and running asynchronously.
-
WaitUntil Method: an optional method for developer to implement, which returns a CommandRequest to tell XDB service to wait for something. It is invoked by XDB service and executed in worker.
-
CommandRequest: An object returned from WaitUntil to tell XDB what commands to wait for
- TimerCommand: A command to wait for a durable timer to fire
- LocalQueueCommand: A command to wait for some messages to consume in LocalQueue
- GlobalQueueCommand: A command to wait for some messages to consume in GlobalQueue
- WaitingType: because a command request could return a list of commands, waiting type will tell how to wait for the combination of them (allOf, AnyOf or more complex combination of Any and All)
-
CommandRequest: An object returned from WaitUntil to tell XDB what commands to wait for
-
Execute Method: a required method for developer to implement, which returns a StateDecision to tell what is next. It is invoked by XDB service and executed in worker.
-
StateDecision: A StateDecision can be:
- Going to single next AsyncState as sequenced steps in single thread
- Going to multiple next AsyncState as in-parallel threads
- Complete/fail the ProcessExecution
- DeadEnd as just stopping the current thread only
-
StateDecision: A StateDecision can be:
-
WaitUntil Method: an optional method for developer to implement, which returns a CommandRequest to tell XDB service to wait for something. It is invoked by XDB service and executed in worker.
-
RPC: A business logic unit as an interface of the ProcessExecution. It's invoked by client and executed in worker. But different from AsyncState, it's synchronously executing so that it can return to the client calls.
- RPC can have same access to Persistence, LocalQueue and GlobalQueue, and even start new threads of AsyncState.
"GlobalAttribute" is essentially the new idea that we just discussed -- it will map to user database's tables directly. So that it can store "Business Data" as stated in "The Modern Transactional Stack".
While "LocalAttribute" is the same as "Data Attribute" in iWF. It's for storing "application state".
Though the XDB is "reusing" the programming model, there are concepts renaming based on the feedback/learning of people using iWF, and based on the fact that XDB will be operating directly on database table which usually represent business "objects".
- Workflow --> Process
- WorkflowId --> ProcessId
- WorkflowRunId --> ProcessExecutionId
- WorkflowExecution --> ProcessExecution
- WorkflowState --> AsyncState
- InternalChannel --> LocalQueue
- DataAttribute --> Local Attribute
Also there are things like "StateExecutionLocal" and "RecordEvents" are dropped as they turned out not too useful.
Side Note: I will write a story in the future for why changing the term workflow to process. Long story short --
- It's just overloaded with some other domain specific technologies
- "workflow" is just one of the categories of use cases. While process is more generic as a technical concept. (think about OS process, this is distributed application process).
- Even Temporal team has admitted "Workflow" is not a good term. And they are now calling it "DurableExecution".
Something will/may be added in the future. To keep this proposal simple, they are just briefly described:
- Read/write with transaction across multiple tables of the database
- Caching
- GlobalAttribute that is linked to other external storage like S3
Note that some new functionalities won't be implemented at the first phase of the project and is too much to cover in this design doc. But it should be natural to implement once you understand the the core design of XDB.
This section will describe the high level design of this project. It will start from XDB users' point of view -- what is the interaction between XDB applications and XDB service. And then go to the architecture of XDB service -- how XDB service interact with its dependencies.
Finally we discuss the actual dependencies that this project will use.
Here are the more concepts names that we will be using:
- XDB api service: part of an XDB service: the API service handles synchronous request from clients
- XDB async service: another part of an XDB service: the Async service implements the async implementation like invoking AsyncState APIs with backoff retry, durable timers, etc
- database: the user database that XDB will be depending and extending from
- CDC: dependency of XDB service. Change data capture of the database
- Message Queue: dependency of XDB service. the message queue that XDB will be used for consuming CDC streams, and implementing backoff retry and durable timers
From XDB's end users(developers)' point of view, XDB service 's just a black box.
To implement an xdb application, they will implement the async process as a worker which exposed a REST API. XDB service will invoke those APIs. XDB service also expose REST APIs to start/stop/etc processes, which is invoked from client that need to interact with the process.
Below is the sequence diagram:
Assuming the process contains a starting AsyncState. So when the process is started, the AsyncState will be executed from xdb service.
The waitUntil
API will be invoked if implemented to return some commands. After the commands are completed, then the execute
API is invoked to return a decision. Based on the decision, xdb service will continue on -- e.g. execute next AsyncStates.
Note that XDB service is essentially a "rewrite" of iWF server, while keep the SDKs and protocol between server and SDKs ("xdb-apis" repo is a folk of "iwf-idl" repo. Same for SDKs).
Side Note: Instead AsyncService is making callback, another way is to let worker pull tasks from AsyncService to process. Reasons we do "Push" rather "Pull" :
- Push is just much easier to implement for now.
- XDB workers are fully stateless, which makes it possible to use push(some others like Cadence workers are semi-stateless, so that they have to implement pulling)
- XDB will implement pull when it's the time
Now dive into the implementation of XDB service:
There two main parts of xdb service: API service and Async service.
The API service is responsible for the synchronous API requests like start/stop a process, from clients that need to interact with the process.
The API service depends on the database. For example, when starting a process, it will write some records in the database, to record that a process is started with an AsyncState to execute.
Then the record of the executing the AsyncState will be delivered to a MessageQueue via CDC. AsyncService will then invoke the worker's REST APIs(exposed via xdb SDK)
A quick overview with some details to help you understand:
- The database will have to "install" some xdb table schemas in order to run a xdb service. Like mentioned above, it will need tables for ProcessExecutions, AsyncStateExecutions as "xdb system tables".
- The new xdb system tables will work together with user's other table for their business. XDB will access to both user custom tables for the Persistence API, and the xdb system tables for the rest.
- For example, when AsyncService invoking an AsyncState's execute API, it need to provide the requested local and global attributes. For global attributes, AsyncService will then read from user custom tables. Execute API will return some decision to update the local/global attributes, and also moving to next AsyncStates. To update the attributes, AsyncService will atomically update the values in user custom tables, and inserting a new record for a new AsyncStateExecution atomically.
- The new record of AsyncStateExecution will then be delivered to MessageQueue again
- The AsyncService will need to invoke worker with backoff retry as defined in the StateOptions of the AsyncState
- The AsyncService will need to process the timer command as a durable timer service. There will be a "TimerTask" xdb system tables that deliver the timer task to AsyncService to process.
More details will be provided with example in the next "Detailed Design" section.
The end goal of of the XDB project is to support any databases as long as it satisfy the requirements:
- Has transaction or conditional write for writing multiple records in multiple tables atomically
- Has change data capture(CDC) support
As the initial phase of the project, MySQL, Postgres and (maybe MongoDB) will be implemented first with making sure any other databases can be supported easily .
Async execution needs to have some "queue" with backoff retry + timers. XDB will implement task queue internally using database (Similar to Cadence/Temporal but also in a different architecture for scalability) .
However, the ideal solution is to use a separate Message queue with CDC for this async implementation. Although this may be difficult for open-source users as it requires operation of a message queue. So the message queue dependency needs to be optional.
In the future, MessageQueue like Apache Pulsar will be used. Technically, other message queues can be used as long as it satisfy the requirements:
- Has backoff retry & DLQ on consuming
- Has consumer group for using different cursors on consuming by different consumers
- Has delayed message deliver for building durable timers
For example, SQS + SNS could work as well. Where the visibility timeout can be used for backoff retry + durable timers(need to workaround the 12 hours limit), and SNS can be used for implementing different consumer groups.
One reason of Apache Pulsar is that not only it provides all above required features, but also it has abstraction of CDC out of the box, called "Pulsar IO".
This sections will provide a lot more details of how to implement the XDB service, with an example to walk through how each components work together.
Note these are just for design discussion purposes. Just like any system designs, there could be some modification/difference in the actual implementation.
In iWF, a "workflowId" is needed when starting a workflow. This workflowId can be reused to start different WorkflowExecutions. It's called "IdReusePolicy". A "RunId" is returned from server to identify a WorkflowExecution.
This "workflowId" and IdReusePolicy is very powerful/useful. WorkflowId can be used directly to represent business identifier. For example, "user-signup-userId123" can be the workflowId for "userId123". When the workflow execution failed for any reasons, another workflow execution can be started to reuse the workflowId.
Similarly in XDB, a "ProcessId" is needed to start a ProcessExecution. To reuse the same ProcessId, a ProcessExecutionId will be returned.
In addition, because the GlobalAttributes are mapped to a table row, it needs an "globalAttributePkValue" so that xdb service can know which row of the table to load as GlobalAttributes for the ProcessExecution.
The code to start a process will be like below
client.start_object_process(
"user-signup-process-userId123", # processId
UserSignupProcess, # Process class
input, # input for stating state if applicable
ProcessStartOptions( # optional configuration like IdReusePolicy, initial data attributes
"userId123", # globalAttributePkValue is required if using GlobalAttribute
...
)
)
As mentioned above, to extend the database, some system tables are needed by xdb service.
Below are the system tables that will be required to install into the database:
Column | Type | Description |
---|---|---|
id(PK) | string | the process execution id |
process_id | string | the process id |
is_current | bool | whether or not it's the current/latest execution for the process_id |
status | string | running/timeout/completed/failed |
start_time | datetime | the start time |
timeout_seconds | int | the timeout |
history_event_id_sequence | int | the sequence number to generate history_event_id for history_event table |
info | blob | store additional static info like processType, workerUrl, tableName and globalAttributePkValue value, use blob for extension |
Because of "IdReusePolicy" of ProcessId, "is_current" is needed to know which is the latest/current execution for the same ProcessId.
Column | Type | Description |
---|---|---|
process_execution_id(PK) | string | the process execution id |
key(PK) | string | the local attribute key |
value | blob | the local attribute value |
process_id | string | the process id for lookup/debugging |
The primary key is composed by three columns: process_execution_id and key.
Column | Type | Description |
---|---|---|
process_execution_id(PK) | string | the process execution id |
async_state_id(PK) | string | the state id |
state_execution_number(PK) | int | the number to indicate this is Xth execution of the async_state_id |
status | string | running/completed/timeout/fail |
input | blob | input of the state execution |
info | blob | static info includes StateOptions |
wait_until_status | string | skipped(if waitUntil API is skipped), running, waiting_commands_completed, completed |
wait_until_commands | blob | the command details that will be waiting for |
wait_until_command_results | blob | the command completed results that have got so far |
execute_status | string | not_started, running, completed |
The primary key is composed by three columns: process_execution_id, async_state_id and state_execution_number. They together uniquely identify a state execution.
This is a critical table that CDC/MessageQueue/AsyncService will be using.
When a new record is inserted, CDC/MessageQueue will deliver it to AsyncService, which will then start invoking the State APIs of worker.
Column | Type | Description |
---|---|---|
id(PK) | string | the timer id |
firing_time | datetime | the firing time |
timer_type | string | process_timeout or timer_command |
info | blob | additional info like process_execution_id, state_execution_id |
This is a critical table that CDC/MessageQueue/AsyncService will be using.
When a new record is inserted, CDC/MessageQueue will deliver it to AsyncService, which will start a "delayed message" for durable timer. When the timer fire, the AsyncService will process the timer.
The "wait_until_commands" in "xdb_sys_async_state_execution" will be referring to this table if there is a "TimerCommand". This means that when "waitUntil" API returns a timerCommand, API service will update the "xdb_sys_async_state_execution" and also inserting a new record for this xdb_sys_timer_tasks.
Similarly, to implement the process timeout for the whole process execution, API service will also insert a new timer record when inserting a "xdb_sys_process_execution".
Note that technically this table is not necessary to implement timers. As CDC can captures changes from xdb_sys_process_execution
or xdb_sys_async_state_execution
directly to start a timer. This is "duplicate" to make the implementation cleaner and easier.
Column | Type | Description |
---|---|---|
process_execution_id(PK) | string | the process execution id |
queue_name(PK) | string | the name of the queue |
sequence_id(PK) | int | the auto increment id for FIFO sequence of the queue |
message_id | string | an id for deduping messages received from clients |
message | blob | the value of the message |
Secondary unique index:
- (process_execution_id, queue_name, message_id)
This table represents the LocalQueue, which stores the messages for the queue. API service will write messages into this table, and also checking if the process_execution has met the required LocalQueueCommands.
GlobalQueue is much more complicated than LocalQueue. We omit it for this design and leave it for future with a separate design doc.
Above are the critical system tables that you need to understand as design proposal. There are more tables that are necessary for implementation, but you don't have to fully understand if you don't contribute to XDB directly.
Column | Type | Description |
---|---|---|
process_id(PK) | string | the process id |
current_execution_id | string | the current/latest execution for the process_id |
Technically this is not necessary, but it's important for implement the xdb using database transaction efficiently.
Because there could be multiple requests starting processExecutions using the same processId in parallel
xdb service has to use database transaction to ensure only one execution is started, based on "IdReusePolicy".
This table will let the transaction easily implement the atomic operation. Without this, the locking on xdb_sys_process_execution
could be in-efficient and hard to implement properly .
Column | Type | Description |
---|---|---|
process_execution_id(PK) | string | the process execution id |
event_id(PK) | int | event sequence id |
event_time | datetime | the event time |
event_type | string | the event type |
event_details | blob | the event details |
This table is non-critical for the implementation. It's just for recording history for debugging purpose. Every action on the processExecution will record an event here (configurable for each execution for how verbose the history will be)
Here uses this "User Signup Workflow" as an example, show the worker and client code, and how they execute through the XDB service, with details of how the above xdb system tables work together.
The requirements are copied from the iWF samples "User Signup Workflow" :
- User fills a form and submit to the system with email
- System will send an email for verification
- User will click the link in the email to verify the account
- If not clicking, a reminder will be sent every X hours
For this use case, user just need a "user" table:
Column | Type | Description |
---|---|---|
user_id(PK) | string | the user id |
form | blob | the other static user info that submitted from sign-up form, use blob for simplification |
status | string | the status of this user account: "waiting" or "verified" |
source | string | the source of where this is verified at |
The worker code is mostly the same as iWF's example, with slightly different at the PersistenceSchema, like we have mentioned above:
class SubmitState(AsyncState[None]):
def execute(self, ctx: WorkflowContext, input: Form, command_results: CommandResults, communication: Communication,
) -> StateDecision:
input = persistence.get_global_attribute("form") # the initial global attributes will be set when starting the process atomically
print(f"API to send verification email to {input.email}")
return StateDecision.single_next_state(VerifyState)
class VerifyState(AsyncState[None]):
def wait_until(self, ctx: WorkflowContext, input: T, communication: Communication,
) -> CommandRequest:
return CommandRequest.for_any_command_completed(
TimerCommand.timer_command_by_duration(
timedelta(hours=24)
),
LocalQueueCommand.by_name("verify"),
)
def execute(self, ctx: WorkflowContext, input: T, command_results: CommandResults, persistence: Persistence,
communication: Communication,
) -> StateDecision:
form = persistence.get_global_attribute("form")
if (
command_results.local_queue_commands[0].status
== QueueCommandStatus.RECEIVED
):
print(f"API to send welcome email to {form.email}")
return StateDecision.graceful_complete_workflow("done")
else:
print(f"API to send the a reminder email to {form.email}")
return StateDecision.single_next_state(VerifyState)
class UserSignupProcess(Process):
def get_workflow_states(self) -> StateSchema:
return StateSchema.with_starting_state(SubmitState(), VerifyState())
def get_persistence_schema(self) -> PersistenceSchema:
return PersistenceSchema.create(
table_name = "user"
primary_key = "user_id", primary_key_type = "string",
PersistenceField.global_attribute_def(col_name = "form", Form),
PersistenceField.global_attribute_def(col_name = "status", str),
PersistenceField.local_attribute_def("source", str),
)
def get_communication_schema(self) -> CommunicationSchema:
return CommunicationSchema.create(
CommunicationMethod.local_queue_def("verify", None)
)
@rpc()
def verify(
self, source: str, persistence: Persistence, communication: Communication
) -> str:
status = persistence.get_global_attribute("status")
if status == "verified":
return "already verified"
persistence.set_global_attribute("status", "verified")
persistence.set_local_attribute("source", source)
communication.publish_to_local_queue("verify")
return "done"
The code to start a process will be like below
client.start_process_execution("user-signup-process-"+user_id, UserSignupProcess , None, # None because the startingState doesn't need input
ProcessStartOptions(
global_attribute_pk_value = user_id, # for mapping global attribute to a database table row
initial_upsert_global_attributes = {
"form": form_input,
"status": "waiting",
}
)
)
With the above setup(database, worker and client code), here is how they will be executed internally in XDB service:
This request is sent to the API service of a XDB service. The API service will atomically(using database transaction):
- Insert a new record into
xdb_sys_process_execution
if not exists - Insert a new record into
xdb_sys_timer_task
- Upsert the global attribute into table
user
- Because
UserSignupProcess
has a starting state, API service will also insert a new record toxdb_sys_async_state_execution
- SubmitState doesn't implement
wait_until
, so thewait_until_status
will be "skipped"
- SubmitState doesn't implement
The AsyncService will receive two events because of the above transaction in the user database:
- A new timer
- A new AsyncStateExecution
For the new timer, AsyncService will produce a new "delayed message" back to Plusar to wait for the timer to fire. When the timer fires, AsyncService will process the timer. Here omits this timeout scenario, to focus on the normal case. It should be straightforward to implement the timeout in API service.
For the new AsyncStateExecution, AsyncService will make a callback to worker to execute the SubmitState.execute
API implementation. The worker implementation will then send an email to user. The callback is controlled with backoff retry which can be easily done with timer queue in AsyncService.
Note that before AsyncService invoke the State APIs, it will load the attributes from user tables according to the PersistenceLoadingPolicy -- load the global and local attributes from user table and xdb_sys_local_attribute table.
Worker will execute SubmitState.execute
, and returns a StateDecision to go to next state. The decision is to go to a next state.
On receiving this StateDecision, AsyncService will atomically:
- update the status of the current AsyncStateExecution
- write a new AsyncStateExecution for the next state
VerifyState
The new AsyncStateExecution for VerifyState
again will go through TaskQueue and finally delivered to AsyncService as another event.
This time, the state needs wait_until
API. So AsyncService will call back to worker VerifyState.wait_until
to execute the API implementation. It will return:
CommandRequest.for_any_command_completed(
TimerCommand.timer_command_by_duration(
timedelta(hours=24)
),
LocalQueueCommand.by_name("verify"),
)
To handle this commandRequest, AsyncService will atomically:
- check if the
LocalQueue
of "verify" has any message to complete the command. - if there is no, it will then mark the status to "waiting_commands_completed" with the commandRequest
- it will also write a new record to the xdb_sys_timer_tasks for the TimerCommand of 24 hours
AsynService will receive an event for the new timer task that just added. Similarly like last time, it will produce a "timer task" to AsyncService for waiting the timer to fire.
Assuming nothing changes for 24 hours, so the timer fires as the "delayed delivered message" arrives. AsyncService will receive this delayed message, and handle this timer firing
First AsyncService will check if this timer firing is enough to complete commandRequest of the stateExecution.
If not enough, it will just update the wait_until_command_results
.
If it's enough, it will move the stateExecution from wait_until
to execute
phase.
In this example, it's enough -- because the commandRequest is "any_command_completed(...)".
Therefore, it will:
- mark the wait_unti_status as "completed", and "execute_status" as "running"
- add the timer fired to the "wait_until_command_results"
- clean up the timer tasks
On receiving event that an AsyncStateExecution is "running", AsyncService will then invoke the VerifyState.execute
API with the commandResults. The commandResults contain information that the timer has fired, but no message received for the LocalQueue.
Similar to AsyncService:1, XDB service will load attributes before invoking the API.
Therefore, worker will go to the branch to send a "Reminder Email" to user, and then return a StateDecision to go back to the VerifyState again.
On receiving the StateDecision, AsyncService will mark the current StateExecution as completed, and insert a new record for AsyncStateExecution.
Then, the same thing will happen again like in "AsyncService: 2" and "AsyncService: 3" to execute the new AsyncStateExecution's wait_until
and start at timer.
This time, the signup user clicks a link in the email, which will invoke an RPC from client:
client.invoke_rpc(username, UserSignupWorkflow.verify, source)
When the API service handles this request, it will invoke the RPC API implementation in worker.
Similar to AsyncService invoke the State APIs, before invoking RPC, API service will load the attributes the PersistenceLoadingPolicy.
The UserSignupProcess.verify
will be invoked and return a response:
- update the data attribute "status" to "verified" " update the data attribute "source" to "email" " send a message to the LocalQueue "verify".
This RPC response will be processed by API service, atomically:
- update the "user" table columns "status" and "source"
- check if the new message to be added to LocalQueue "verify" can be enough to complete a stateExecution
- if not, then just add it to the queue
- if yes, then add the message to the "wait_until_command_results", mark the wait_until_status to completed and execute_status to running
On receiving event that an AsyncStateExecution is "running", AsyncService will then invoke the VerifyState.execute
API with the commandResults.
This timer, the commandResults contain information that the LocalQueueCommand has completed. So it will send an "welcome email" to signup user, and return a StateDecision to complete the process.
On receiving the StateDecision, AsyncService will mark the stateExecution as completed, and the processExecution as completed.
TODO
TODO
- Search Attribute
- Caching