AWS Athena: GEO IP Lookups
- modified:
- reading: 7 minutes
Suppose you want to analyze web traffic properly. A lot of time, you want to know the GEO origin of the requests. It is possible to map IP addresses to Locations. You can use MaxMind GeoIP tables. MaxMind provides a free version of their Lite tables that they update weekly.
In this post, we create a Lambda function, which we run on schedule every 7 days (weekly). This function uploads the latest tables (CSV files) to S3 from MaxMind. We use these CSV files as sources for AWS Athena tables to use them in SQL to implement GEO IP Lookups.
Sign Up for a MaxMind account
To be able to use GeoLite2 databases, you need to sign up for a MaxMind account. After you signed up, under Services you can find “My Service Key”, generate a new one (I have generated it in a new format at the time of posting this). Save this Service Key. We use GeoLite2-City-CSV format, that we can use as a source for AWS Athena tables. You can read more about this format at GeoIP2 City and Country CSV Databases.
Test the newly generated license key and looks at the CSV files by using this curl
command
curl -o GeoLite2-City-CSV.zip \
'https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-City-CSV&license_key={{YOUR_LICENSE_KEY}}&suffix=zip'
Check the latest documentation on how to download GeoIP database
Lambda function to upload GeoIP tables to S3
I have created a bucket s3://app.loshadki.data
, and I am planning to store my databases under
s3://app.loshadki.datadata/geoip_blocks/data.csv.gz
- CSV file containing data on IPv4 addressess3://app.loshadki.datadata/geoip_locations/data.csv.gz
- CSV file containing data location
Create a new Lambda Function. I call it GeoIP-Table-Update
and using python:3.8
.
Add environment variables:
MAXMIND_GEOIP_LICENSE
- your license key from MaxMindS3_BUCKET_NAME
- S3 bucket name, I useapp.loshadki.data
S3_BUCKET_PREFIX
- prefix for databases, I usedata
This function will be long-running. Change in basic settings Timeout to 5 minutes. I have changed the memory
to 256Mb
. With more memory, you also get more CPU power, so it is cheaper to run it under more memory,
even if you are not using it. We run this function only once a week, so change to your bill is minimum.
To run it every week add a new trigger, select EventBridge (Cloud Watch Events), create a new
rule upload-geoip-to-s3-weekly
with schedule expression rate(7 days)
.
To be able to write to your S3 bucket go to Permissions tab, open autogenerated Role, and add inline Permission
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::app.loshadki.data/data/*"
}
]
}
Now copy-paste the code from below, deploy and test it. It will take a few minutes to run. On success, you should see the files uploaded to your S3 location.
Code of Lambda Function
import os
import os.path
import urllib.request
import shutil
import zipfile
import tempfile
import gzip
import boto3
def lambda_handler(event, context):
with tempfile.TemporaryDirectory() as tmpdirname:
zipfilename = os.path.join(tmpdirname, 'GeoLite2-City-CSV.zip')
print('step 1 - download geolite ip database')
download_geo_ip(tmpdirname, zipfilename)
print('step 2 - unzip all files')
unzip_all(tmpdirname, zipfilename)
print('step 3 - gzip files')
gzip_files(tmpdirname)
print('step 4 - upload to s3')
upload_to_s3(tmpdirname)
return
def download_geo_ip(tmpdirname, zipfilename):
geoip_url = 'https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-City-CSV&license_key={}&suffix=zip'.
format(os.getenv('MAXMIND_GEOIP_LICENSE'))
with urllib.request.urlopen(geoip_url) as response, open(zipfilename, 'wb') as output:
shutil.copyfileobj(response, output)
def unzip_all(tmpdirname, zipfilename):
# unzip all, but without the directories, to easily find the files
with zipfile.ZipFile(zipfilename, 'r') as z:
for member in z.namelist():
filename = os.path.basename(member)
# if a directory, skip
if not filename:
continue
# copy file (taken from zipfile's extract)
with z.open(member) as zobj:
with open(os.path.join(tmpdirname, filename), "wb") as targetobj:
shutil.copyfileobj(zobj, targetobj)
def gzip_files(tmpdirname):
for filename in ['GeoLite2-City-Blocks-IPv4.csv', 'GeoLite2-City-Locations-en.csv']:
file_path = os.path.join(tmpdirname, filename)
with open(file_path, 'rb') as f_in,
gzip.open(file_path + '.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
def upload_to_s3(tmpdirname):
s3_bucket_name = os.getenv('S3_BUCKET_NAME')
s3_bucket_prefix = os.getenv('S3_BUCKET_PREFIX')
s3_client = boto3.client('s3')
s3_client.upload_file(
os.path.join(tmpdirname, 'GeoLite2-City-Blocks-IPv4.csv.gz'),
s3_bucket_name,
os.path.join(s3_bucket_prefix, 'geoip_blocks/data.csv.gz')
)
s3_client.upload_file(
os.path.join(tmpdirname, 'GeoLite2-City-Locations-en.csv.gz'),
s3_bucket_name,
os.path.join(s3_bucket_prefix, 'geoip_locations/data.csv.gz')
)
Create Athena tables
We do all the operations under database
default
.
Go to the AWS Athena. We will define the table for the IPv4 blocks with LazySimpleSerDe.
CREATE EXTERNAL TABLE IF NOT EXISTS default.geoip_blocks (
network STRING,
geoname_id INT,
registered_country_geoname_id INT,
represented_country_geoname_id INT,
is_anonymous_proxy INT,
is_satellite_provider INT,
postal_code STRING,
latitude DOUBLE,
longitude DOUBLE,
accuracy_radius INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
LOCATION 's3://app.loshadki.data/data/geoip_blocks/'
TBLPROPERTIES ('skip.header.line.count'='1');
And table for locations with OpenCSVSerDe as it does have quoted fields.
CREATE EXTERNAL TABLE IF NOT EXISTS default.geoip_locations (
geoname_id INT,
locale_code STRING,
continent_code STRING,
continent_name STRING,
country_iso_code STRING,
country_name STRING,
subdivision_1_iso_code STRING,
subdivision_1_name STRING,
subdivision_2_iso_code STRING,
subdivision_2_name STRING,
city_name STRING,
metro_code STRING,
time_zone STRING,
is_in_european_union INT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'separatorChar' = ',',
'quoteChar' = '\"',
'escapeChar' = '\\'
)
LOCATION 's3://app.loshadki.data/data/geoip_locations/'
TBLPROPERTIES ('skip.header.line.count'='1');
Run a simple query to verify that tables are created, and you have access to them
select *
from default.geoip_blocks t1
inner join default.geoip_locations t2 on t1.geoname_id = t2.geoname_id
limit 10
Converting tables for CIDR lookups
As you can see, the table geoip_blocks
defines blocks using CIDR notation,
like 1.0.0.0/24
, which means that this block includes all IP addresses from 1.0.0.0
to 1.0.0.255
.
The current version of Presto does support CIDR lookups,
but latest version of Athena engine (v2)
does not support them as it is based on the Presto 0.217. We have to
convert these CIDR representations to something else.
One way, is to use Integer representation of the IP addresses to be able to run queries like ip_start <= ip_address <= ip_end
.
To convert the IP address to integer is easy, we just need to run
ipv4[1]*256*256*256 + ipv4[2]*256*256 + ipv4[3]*256 + ipv4[4]
. We need to convert the bits (the part /24
)
to last IPv4 address of the range.
The easies way is to create a view on top of the geoip_blocks
CREATE OR REPLACE VIEW geoip_blocks_int AS
select
cast(ip[1] as BIGINT)*256*256*256 + cast(ip[2] as BIGINT)*256*256 + cast(ip[3] as BIGINT)*256 + cast(ip[4] as BIGINT) as ip_start,
(
bitwise_or(cast(ip[1] as BIGINT), bitwise_and(255, cast(power(2, greatest(8 - range, 0)) as BIGINT)-1))
)*256*256*256 +
(
bitwise_or(cast(ip[2] as BIGINT), bitwise_and(255, cast(power(2, greatest(16 - range, 0)) as BIGINT)-1))
)*256*256 +
(
bitwise_or(cast(ip[3] as BIGINT), bitwise_and(255, cast(power(2, greatest(24 - range, 0)) as BIGINT)-1))
)*256+
(
bitwise_or(cast(ip[4] as BIGINT), bitwise_and(255, cast(power(2, greatest(32 - range, 0)) as BIGINT)-1))
) as ip_end,
network,
geoname_id,
registered_country_geoname_id,
represented_country_geoname_id,
cast(is_anonymous_proxy as BOOLEAN) as is_anonymous_proxy,
cast(is_satellite_provider as BOOLEAN) as is_satellite_provider,
postal_code,
latitude,
longitude,
accuracy_radius
from
(
select
network,
geoname_id,
registered_country_geoname_id,
represented_country_geoname_id,
is_anonymous_proxy,
is_satellite_provider,
postal_code,
latitude,
longitude,
accuracy_radius,
split(network_array[1], '.') as ip,
cast(network_array[2] as BIGINT) as range
from
(
select
network,
geoname_id,
registered_country_geoname_id,
represented_country_geoname_id,
is_anonymous_proxy,
is_satellite_provider,
postal_code,
latitude,
longitude,
accuracy_radius,
split(network, '/') as network_array
from default.geoip_blocks
)
)
With the following query, we parse network
field and convert it to the fields ip_start
and ip_end
and create a
new view from the result default.geoip_blocks_int
.
Testing time
SQL Example with one IP
The testing table that we create in memory contains only one ip address 1.1.1.1
, but it shows you how quickly you
can convert the ip to the integer.
with ips as (
select
(
cast(ip_array[1] as BIGINT)*256*256*256 +
cast(ip_array[2] as BIGINT)*256*256 +
cast(ip_array[3] as BIGINT)*256 +
cast(ip_array[4] as BIGINT)
) as ip_int,
ip
from (
select
'1.1.1.1' as ip,
split('1.1.1.1', '.') as ip_array
) as source
)
select
ips.ip,
locations.continent_name,
locations.country_name,
locations.city_name,
locations.time_zone
from
ips as ips
left join geoip_blocks_int as blocks on blocks.ip_start <= ips.ip_int and ips.ip_int <= blocks.ip_end
left join geoip_locations as locations on blocks.geoname_id = locations.geoname_id
SQL Example with CloudFront access logs
with access_logs as (
select
uri,
(
cast(split(ip, '.')[1] as BIGINT)*256*256*256 +
cast(split(ip, '.')[2] as BIGINT)*256*256 +
cast(split(ip, '.')[3] as BIGINT)*256 +
cast(split(ip, '.')[4] as BIGINT)
) as ip_int
from (
select uri,
case xforwarded_for
when '-' then request_ip
else xforwarded_for
end as ip
from access_logs_yesterday
where
sc_content_type = 'text/html'
and status = 200
and method = 'GET'
and not regexp_like(url_decode(user_agent), '(bot|spider)')
)
)
select
count(*) as count,
access_logs.uri as uri,
locations.continent_name,
locations.country_name,
locations.city_name,
locations.time_zone
from
access_logs
left join geoip_blocks_int as blocks on
blocks.ip_start <= access_logs.ip_int and access_logs.ip_int <= blocks.ip_end
left join geoip_locations as locations on blocks.geoname_id = locations.geoname_id
group by 2, 3, 4, 5, 6
order by 1
What is next?
You can use the postal_code
or city_name
and country_name
in AWS QuickSight reports visualizing data on the maps.
I also have a CloudWatch Alert that tells me if any Lambda Functions fails for more than 2 times in selected period, just
to track that my functions work as expected. Hope that is useful!
In the next post I will show how you can export AWS Amplify access logs to S3 every hour and use this data in the Athena to run analytics reports. And we will use GeoIP lookups for them as well.