Skip to content

Firehose Configurations Details

The firehose event will automatically handle many common things done when eventing off a firehose stream. Developers then have the ability to further extend that functionality with custom middleware. Below is a full list of all the configurations available and examples of their use.

Examples

Don't like reading documentation? Then look at our examples, which can be deployed in 1 command into your AWS account! 🤓

Lambda Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
functions:
    firehose-handler:
        handler: service/handlers/firehose.handle
        memorySize: 512
        timeout: 30
        events:
            - stream:
                type: firehose
                arn:
                Fn::GetAtt: [ MyfirehoseStream, Arn ]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from acai_aws.firehose.requirements import requirements
from acai_aws.common import logger

@requirements(
    before=log_something,
    data_class=SomeClass,
    raise_operation_error=True,
    raise_body_error=True,
    schema='service/openapi.yml',
    required_body='v1-firehose-body', # or jsonschema dict
    after=alert_something,
)
def handle(event):
    for record in event.records:
        logger.log(log=record)

# example data class
class SomeClass:
    def __init__(self, record):
        for k, v in record.body.items():
            setattr(self, k, v)

# example before function
def log_something(records, requirements):
    if 'something' in requirements:
        logger.log(log=records) 

# example after function
def alert_something(records, result, requirements):
    if 'something' in result and 'alert' in requirements:
        logger.log(log=records)

Requirements Configuration Options

option type required default description
before func no None a custom function to be ran before your records are pulled
after func no None a custom function to be ran after your records are pulled
data_class class no None a custom class that will be passed instead of the records object
raise_body_error bool no False will raise exception if body of record does not match schema provided
required_body str or dict no None will validate body of record against this schema
schema str no None file path pointing to the location of the openapi.yml file

Firehose Record Properties

property type description
record_id str record id of the stream
epoc_time_stamp int epoc time stamp of the stream
shard_id str shard id arn of the stream
subsequence_number str subsequence number arn of the stream
partition_key str partition key
time_stamp str time stamp
sequence_number str sequence number
data any can be anything, automaticallyed b64 decoded objects
body any can be anything, automaticallyed b64 decoded objects

record.record_id

1
2
3
4
print(record.record_id);

# output
'record1'

record.epoc_time_stamp

1
2
3
4
print(record.epoc_time_stamp);

# output
1510772160000

record.shard_id

1
2
3
4
print(record.shard_id);

# output
'shardId-000000000000'

record.subsequence_number

1
2
3
4
print(record.subsequence_number);

# output
''

record.partition_key

1
2
3
4
print(record.partition_key);

# output
'1'

record.time_stamp

1
2
3
4
print(record.time_stamp);

# output
1545084650.987

record.sequence_number

1
2
3
4
print(record.sequence_number);

# output
49590338271490256608559692538361571095921575989136588898

record.data

1
2
3
4
5
6
print(record.data);

# output
{
    'new_data': '123456789'
}

record.body

1
2
3
4
5
6
print(record.body);

# output
{
    'new_data': '123456789'
}