In today’s digital age, social media has become an integral part of people’s lives. Every day, millions of users create and share content such as photos, videos, and text posts on social media platforms like Facebook, Twitter, Instagram, LinkedIn, and more. As a developer, you may want to extract and analyze this social media data to gain valuable insights into user behavior, sentiment analysis, and social media trends.
Amazon Kinesis is a fully managed service that makes it easy to build data streaming pipelines to collect, process, and analyze streaming data in real-time. In this tutorial, we will learn how to create pipelines to stream social media data with Amazon Kinesis.
Prerequisites
Before we begin, you will need the following:
- An AWS account
- Familiarity with AWS Kinesis, AWS Lambda, and AWS S3
- Python 3.x installed on your local machine
Step 1 โ Create an AWS Kinesis data stream
First, we need to create a Kinesis data stream in the AWS Management Console. Follow the steps below:
- Open the AWS Management Console in your web browser and login to your AWS account
- Navigate to the Kinesis service and click on the ‘Get started’ button if you see a welcome page
- Select the ‘Data streams’ option from the left-hand menu
- Click the ‘Create data stream’ option and enter a stream name of your choice. Choose a reasonable Kinesis stream name.
- In the ‘Number of shards’ field, enter the number of shards you need. A shard is a unit of capacity in an AWS Kinesis data stream.
- Click create stream
Step 2 โ Create a Python script to stream social media data to Kinesis
In this step, we will create a Python script to stream data to the Kinesis data stream we just created. The Python script will use the Tweepy library to stream data from Twitter to Kinesis. For this script to work, you will need to set up a Twitter developer account and obtain API keys and access tokens. Follow the steps below:
- Go to ‘https://developer.twitter.com/en/apps’ and log in with your Twitter account. If you don’t have a developer account, you can create one.
- Create a new app and enter the required details. After the app is created, go to the app settings page.
- Click on the ‘Keys and Tokens’ tab and obtain the ‘Consumer Key,’ ‘Consumer Secret,’ ‘Access Token,’ and ‘Access Token Secret.’ These keys will be used to authenticate your script and stream data from Twitter.
- Create a new Python file in your favorite editor and name it ‘streaming-data-to-kinesis.py.’
- Install the ‘tweepy’ library by running the following commands in your terminal:
pip install tweepy
- Next, we will import the necessary packages and define the credentials to access the Twitter API. Replace ‘xxxxxx’ with your own keys and tokens:
import tweepy
import json
import boto3
consumer_key = 'xxxxxx'
consumer_secret = 'xxxxxx'
access_token = 'xxxxxx'
access_token_secret = 'xxxxxx'
- We then define a function called ‘create_kinesis_client,’ which uses the boto3 library to create an instance of the Kinesis client. The Kinesis client will provide a low-level interface to work with Amazon Kinesis.
def create_kinesis_client():
return boto3.client('kinesis')
- Next, we create a function called ‘stream_tweets,’ which will stream data from Twitter using the Tweepy library. The ‘OnData’ method will be used to handle the streaming data. Inside the function, we will authenticate using the consumer and access tokens obtained from the Twitter developer account page. Replace ‘my-stream-name’ with the name you gave your Kinesis stream.
def stream_tweets():
kinesis_client = create_kinesis_client()
class TweetStreamListener(tweepy.StreamListener):
def __init__(self):
super(TweetStreamListener, self).__init__()
def on_data(self, data):
record = {'Data': json.dumps(data)}
kinesis_client.put_record(
StreamName='my-stream-name',
Data=record['Data'],
PartitionKey='my-partition-key'
)
return True
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
tweet_stream = tweepy.streaming.Stream(auth=auth, listener=TweetStreamListener())
tweet_stream.filter(track=['data', 'technology'])
- Finally, run the ‘stream_tweets’ function to stream Twitter data to your Kinesis data stream.
if __name__ == '__main__':
stream_tweets()
Step 3 โ Create a Kinesis data processing pipeline with AWS Lambda
In this step, we will create a Kinesis Data Processing pipeline using AWS Lambda to process the streaming data from Kinesis. We will create a Lambda function that receives the streaming data, processes it by capitalizing all the text, and saves the processed data to an S3 bucket. Follow the steps below:
- Open the AWS Management Console and navigate to the AWS Lambda service.
- Click on ‘Create Function’ and choose the ‘Author from scratch’ option.
- Give your function a name and choose ‘Python 3.7’ as the runtime.
- In the ‘Designer’ section, click on the ‘Add Trigger’ button and choose the ‘Kinesis’ option. Select the Kinesis data stream we created in Step 1.
- In the ‘Function code’ section, replace the default code with the code below. This code takes the incoming data as input, capitalizes all the text, and saves the processed data to an S3 bucket.
import json
import boto3
s3 = boto3.resource('s3')
def lambda_handler(event, context):
print('Received event: ' + json.dumps(event))
data = event['Records'][0]['kinesis']['data']
text = json.loads(data)['text']
capitalized_text = text.upper()
bucket = 'my-bucket-name'
file_name = 'my-file-name.txt'
s3.Bucket(bucket).put_object(Key=file_name, Body=capitalized_text)
return {
'statusCode': 200,
'body': json.dumps('Data processed successfully')
}
- In the ‘Basic settings’ section, set the ‘timeout’ to 1 minute and click ‘Save.’
- Add the ‘S3FullAccess’ policy to the Lambda execution role.
- Run the ‘streaming-data-to-kinesis.py’ script we created in Step 2 to test the pipeline.
Congratulations, you have successfully created a pipeline to stream social media data with Amazon Kinesis. You can further enhance the pipeline to perform analytics and insights on the streaming data.