Skip to content

MSK Configurations Details

The msk event will automatically handle many common things done when eventing off a generic event invoked manually or programmatically. Developers then have the ability to further extend that functionality with custom middleware. Below is a full list of all the configurations available and examples of their use.

Examples

Don't like reading documentation? Then look at our examples, which can be deployed in 1 command into your AWS account! 🤓

Lambda Configuration

1
2
3
4
5
6
7
8
9
functions:
    msk-handler:
        handler: service/handlers/msk.handle
        memorySize: 512
        timeout: 30
        events:
            - msk:
                arn: arn:aws:kafka:region:XXXXXX:cluster/MyCluster/xxxx-xxxxx-xxxx
                topic: mytopic
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from acai_aws.firehose.requirements import requirements
from acai_aws.common import logger

@requirements(
    before=log_something,
    data_class=SomeClass,
    raise_operation_error=True,
    raise_body_error=True,
    schema='service/openapi.yml',
    required_body='v1-msk-body', # or jsonschema dict
    after=alert_something,
)
def handle(event):
    for record in event.records:
        logger.log(log=record.body)

# example data class
class SomeClass:
    def __init__(self, record):
        for k, v in record.body.items():
            setattr(self, k, v)

# example before function
def log_something(records, requirements):
    logger.log(log=event.body) 

# example after function
def alert_something(event, result, requirements):
    if 'something' in result and 'alert' in requirements:
        logger.log(log=event)

Requirements Configuration Options

option type required default description
before func no None a custom function to be ran before your records are pulled
after func no None a custom function to be ran after your records are pulled
data_class class no None a custom class that will be passed instead of the records object
raise_body_error bool no False will raise exception if body of record does not match schema provided
required_body str or dict no None will validate body of record against this schema
schema str no None file path pointing to the location of the openapi.yml file

MQ Event Properties

property type description
message_id str id of the message
message_type str type of the message
delivery_mode int mode of delivery
reply_to str (nullable) reply to string
record_type str (nullable) type of record
expiration str expiration of message
priority int priority of message
correlation_id str correlation id of message
redelivered bool whether the message has been redelivered
destination dict destination of the message
properties dict properties of the message
time_stamp int time_stamp of the message
in_time int in time of the message
out_time int out time of the message
body any body of the message
data any data of the message

record.message_id

1
2
3
4
print(record.message_id);

# output
'ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1'

record.message_type

1
2
3
4
print(record.message_type);

# output
'jms/text-message'

record.delivery_mode

1
2
3
4
print(record.delivery_mode);

# output
1

record.reply_to

1
2
3
4
print(record.reply_to);

# output
None

record.record_type

1
2
3
4
print(record.record_type);

# output
None

record.expiration

1
2
3
4
print(record.expiration);

# output
'60000'

record.priority

1
2
3
4
print(record.priority);

# output
1

record.correlation_id

1
2
3
4
print(record.correlation_id);

# output
'myJMSCoID'

record.redelivered

1
2
3
4
print(record.redelivered);

# output
False

record.destination

1
2
3
4
5
6
print(record.destination);

# output
{
    'physicalName': 'testQueue'
}

record.properties

1
2
3
4
5
6
7
8
print(record.properties);

# output
{
    'index': '1',
    'doAlarm': 'false',
    'myCustomProperty': 'value'
}

record.time_stamp

1
2
3
4
print(record.time_stamp);

# output
1598827811958

record.in_time

1
2
3
4
print(record.in_time);

# output
1598827811958

record.out_time

1
2
3
4
print(record.out_time);

# output
1598827811959

record.body

1
2
3
4
5
6
print(record.body);

# output
{
    'new_data': '123456789'
}

record.data

1
2
3
4
5
6
print(record.data);

# output
{
    'new_data': '123456789'
}