Pull and ingest data from a third-party API
This tutorial builds a data pipeline that pulls data from a third-party finance API and loads it into TimescaleDB.
This tutorial requires multiple libraries. This can make your deployment package size larger than the 250 MB limit of Lambda. You can use a Docker container to extend the package size up to 10 GB, giving you much more flexibility in libraries and dependencies. For more about AWS Lambda container support, see the AWS documentation.
The libraries used in this tutorial:
Create an ETL function
Extract, transform, and load (ETL) functions are used to pull data from one database and ingest the data into another. In this tutorial, the ETL function pulls data from a finance API called Alpha Vantage, and inserts the data into TimescaleDB. The connection is made using the values from environment variables.
This is the ETL function used in this tutorial:
# function.py:
import csv
import pandas as pd
import psycopg2
from pgcopy import CopyManager
import os
config = {'DB_USER': os.environ['DB_USER'],
'DB_PASS': os.environ['DB_PASS'],
'DB_HOST': os.environ['DB_HOST'],
'DB_PORT': os.environ['DB_PORT'],
'DB_NAME': os.environ['DB_NAME'],
'APIKEY': os.environ['APIKEY']}
conn = psycopg2.connect(database=config['DB_NAME'],
host=config['DB_HOST'],
user=config['DB_USER'],
password=config['DB_PASS'],
port=config['DB_PORT'])
columns = ('time', 'price_open', 'price_close',
'price_low', 'price_high', 'trading_volume', 'symbol')
def get_symbols():
"""Read symbols from a csv file.
Returns:
[list of strings]: symbols
"""
with open('symbols.csv') as f:
reader = csv.reader(f)
return [row[0] for row in reader]
def fetch_stock_data(symbol, month):
"""Fetches historical intraday data for one ticker symbol (1-min interval)
Args:
symbol (string): ticker symbol
month (int): month value as an integer 1-24 (for example month=4 fetches data from the last 4 months)
Returns:
list of tuples: intraday (candlestick) stock data
"""
interval = '1min'
slice = 'year1month' + str(month) if month <= 12 else 'year2month1' + str(month)
apikey = config['APIKEY']
CSV_URL = 'https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY_EXTENDED&' \
'symbol={symbol}&interval={interval}&slice={slice}&apikey={apikey}' \
.format(symbol=symbol, slice=slice, interval=interval,apikey=apikey)
df = pd.read_csv(CSV_URL)
df['symbol'] = symbol
df['time'] = pd.to_datetime(df['time'], format='%Y-%m-%d %H:%M:%S')
df = df.rename(columns={'time': 'time',
'open': 'price_open',
'close': 'price_close',
'high': 'price_high',
'low': 'price_low',
'volume': 'trading_volume'}
)
return [row for row in df.itertuples(index=False, name=None)]
def handler(event, context):
symbols = get_symbols()
for symbol in symbols:
print("Fetching data for: ", symbol)
for month in range(1, 2):
stock_data = fetch_stock_data(symbol, month)
print('Inserting data...')
mgr = CopyManager(conn, 'stocks_intraday', columns)
mgr.copy(stock_data)
conn.commit()
Add a requirements file
When you have created the ETL function, you need to include the libraries you want to install. You can do this by creating a text file in your project called requirements.txt
that lists the libraries. This is the requirements.txt
file used in this tutorial:
pandas
requests
psycopg2-binary
pgcopy
note
This example uses psycopg2-binary
instead of psycopg2
in the requirements.txt
file. The binary version of the library contains all its dependencies, so that you don’t need to install them separately.
Create the Dockerfile
When you have the requirements set up, you can create the Dockerfile for the project.
Creating the Dockerfile
Use an AWS Lambda base image:
FROM public.ecr.aws/lambda/python:3.8
Copy all project files to the root directory:
COPY function.py .
COPY requirements.txt .
Install the libraries using the requirements file:
RUN pip install -r requirements.txt
CMD ["function.handler"]
Upload the image to ECR
To connect the container image to a Lambda function, you need to upload it to the AWS Elastic Container Registry (ECR).
Uploading the image to ECR
Log in to the Docker command line interface:
aws ecr get-login-password --region us-east-1 \
| docker login --username AWS \
--password-stdin <AWS_ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com
Build the image:
docker build -t lambda-image .
Create a repository in ECR. In this example, the repository is called
lambda-image
:aws ecr create-repository --repository-name lambda-image
Tag your image using the same name as the repository:
docker tag lambda-image:latest <AWS_ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com/lambda-image:latest
Deploy the image to Amazon ECR with Docker:
docker push <AWS_ACCOUNT_ID>.dkr.ecr.us-east-1.amazonaws.com/lambda-image:latest
Create a Lambda function from the container
To create a Lambda function from your container, you can use the Lambda create-function
command. You need to define the --package-type
parameter as image
, and add the ECR Image URI using the --code
flag:
aws lambda create-function --region us-east-1 \
--function-name docker_function --package-type Image \
--code ImageUri=<ECR Image URI> --role <ARN_LAMBDA_ROLE>
Schedule the Lambda function
If you want to run your Lambda function according to a schedule, you can set up an EventBridge trigger. This creates a rule using a cron expression.
Scheduling the Lambda function
Create the schedule. In this example, the function runs every day at 9 AM:
aws events put-rule --name schedule-lambda --schedule-expression 'cron(0 9 * * ? *)'
Grant the necessary permissions for the Lambda function:
aws lambda add-permission --function-name <FUNCTION_NAME> \
--statement-id my-scheduled-event --action 'lambda:InvokeFunction' \
--principal events.amazonaws.com
Add the function to the EventBridge rule, by creating a
targets.json
file containing a memorable, unique string, and the ARN of the Lambda Function:[
{
"Id": "docker_lambda_trigger",
"Arn": "<ARN_LAMBDA_FUNCTION>"
}
]
Add the Lambda function, referred to in this command as the
target
, to the rule:aws events put-targets --rule schedule-lambda --targets file://targets.json
important
If you get an error saying Parameter ScheduleExpression is not valid
, you might have made a mistake in the cron expression. Check the cron expression examples documentation.
You can check if the rule is connected correctly to the Lambda function in the AWS console. Navigate to Amazon EventBridge → Events → Rules, and click the rule you created. The Lambda function’s name is listed under Target(s)
: