How To Verify Data Quality On Tables Landed On AWS Data Lake And Data Warehouse.
5 Min Read
The Story
As a data engineer, the responsibility of ensuring data quality is not just a task - it’s a commitment to the integrity of insights derived from the vast volumes of data we manage. The accuracy and dependability of your insights depend on the quality of the data you gather. When working with large datasets in Enterprise Data Lakes (EDL) and Enterprise Data Warehouses (EDW), this is especially true.
Ensuring that the tables that you land on these platforms meet the highest data quality standards is a substantial challenge, but one that can be met with the right tools and techniques.
In this guide/post, I’ll detail how I verify Data Quality on tables landed on AWS EDL and EDW using a simple Python script. The script is designed to automate various critical data validation checks, simplifying the process and empowering our data team to maintain and deliver high-quality data.
Understanding Data Quality
Data quality refers to the overall reliability, accuracy, completeness, and consistency of data within a dataset. It involves ensuring that the data meets predefined standards and is fit for its intended use. High-quality data is free from errors, duplications, and inconsistencies, making it a trustworthy foundation for making informed decisions, conducting analysis, and deriving meaningful insights. The significance of data quality lies in its direct impact on the reliability of business intelligence, analytics, and decision-making processes. Poor data quality can lead to flawed insights, operational inefficiencies, and misguided strategic decisions.
The How
Every morning, I would find myself having to log into the AWS Console and run queries on Athena to verify that the tables were ingested and landed successfully in the Data Lake (EDL) the previous night, thereafter run similar data verification queries on our RedShift cluster via DBeaver to verify if tables were exposed successfully. This mundane task became tedious over time, so I developed a Python script to automate the data validation process for both the EDL and the Data Warehouse (EDW).
The script provides a flexible and comprehensive solution for data validation in the AWS environment. It connects to AWS using either SSO (Single Sign-on) or traditional access key and secret credentials [Not Recommended]. It then dynamically iterates through a list of tables and databases specified in a YAML file, executing SQL queries against the EDL to perform a range of critical data validation checks:
- Table existence: Verifies that the table exists and verifies that the column names are as expected in the EDL/EDW
- Count per date: Verifies that the number of rows in the table is consistent with the expected number of rows for the date.
- Metadata uniqueness: Verifies that the metadata for the table is consistent and unique, ensuring that there are no duplicates.
- Extraction date validation: Verifies that the extraction date for the table is correct.
- Field-level consistency: Verifies that the values in the table are valid and consistent.
To ensure data synchronization between the EDL and EDW, my script also establishes a Redshift connection using Azure OAuth for secure authentication and replicates the same suite of validation checks on the EDW. This holistic approach guarantees data integrity and quality across AWS environments.
The Walk-Through
In this section, I will detail the script. First, we need to create a Config.yaml
file in the project directory. This file contains AWS services to be used, connection configurations, tables, database and column names.
cat << EOF > Config.yaml
aws_service:
region: eu-west-1
sso_profile: $SSO_PROFILE
credentials_path: $CREDENTIALS_PATH|null
athena:
tables:
- name: table_name
database: glue_database_name
columns:
- id
max_expected_count: 30000000
redshift:
config:
iam: true
ssl: true
database: database_name
host: host
cluster_identifier: cluster id
user: user
credentials_provider: 'BrowserAzureOAuth2CredentialsProvider'
listen_port: port
client_id: client_id
idp_tenant: idp_tenant
scope: scope_url
tables:
- name: table_name
database: database
columns:
- id
EOF
Once we have the Config.yaml
file, we can create the script AWS_Data_Validation.py
applying Python’s SOLID principles.
The code below details shows an AWS_Credentials
class that provides methods for retrieving and reading AWS credentials from a configuration file (~/.aws/credentials
).
class AWS_Credentials:
def __init__(
self, sso_profile: str, credentials_path: str, region_name: str
) -> None:
self.sso_profile = sso_profile
self.credentials_path = credentials_path
self.region_name = region_name
def config_file_path(self) -> str:
"""
If the sso_profile is not None, return the sso_profile.
If the sso_profile is None, check if the credentials_path exists.
If the credentials_path exists, return the credentials_path.
If the credentials_path does not exist, raise a FileNotFoundError.
:return: The path to the config file.
"""
if self.sso_profile:
return self.sso_profile
elif Path(self.credentials_path).exists():
return Path(self.credentials_path).as_posix()
else:
raise FileNotFoundError
def read_credentials(self, **kwargs) -> dict:
"""
`read_credentials` reads the credentials from a file and returns them as a
dictionary
"""
try:
if self.sso_profile:
return {"sso_profile": self.sso_profile}
else:
config = configparser.RawConfigParser()
creds_path = self.config_file_path()
config.read(creds_path)
section = kwargs.get("profile", config.sections()[0])
return {
"aws_access_key_id": config.get(section, "aws_access_key_id"),
"aws_secret_access_key": config.get(
section, "aws_secret_access_key"
),
"aws_session_token": config.get(section, "aws_session_token"),
}
except Exception:
raise
Thereafter, we create another class BotoSession
which inherits AWS_Credentials
and then provides methods for creating a session, verifying the session and returning a client object for an AWS service.
class BotoSession(AWS_Credentials):
def __init__(
self,
sso_profile: str = "",
credentials_path: str = "",
region_name: str = "",
):
self.sso_profile = sso_profile
self.credentials_path = credentials_path
self.region_name = region_name
super().__init__(sso_profile, credentials_path, region_name)
def session_client(self, client: str, **kwargs) -> object:
"""
It reads the credentials from the file, creates a session,
and returns a client object
:param client: The AWS service you want to use
:type client: str
:return: The session object is being returned.
"""
creds = self.read_credentials(**kwargs)
if creds.get("sso_profile"):
session = boto3.Session(
profile_name=creds.get("sso_profile"),
region_name=self.region_name
)
else:
session = boto3.Session(
aws_access_key_id=creds["aws_access_key_id"],
aws_secret_access_key=creds["aws_secret_access_key"],
aws_session_token=creds["aws_session_token"],
region_name=self.region_name,
)
session = self.verified_session(session, **creds)
return session.client(client)
@staticmethod
def verified_session(session: boto3.session.Session, **kwargs):
sts = session.client("sts")
try:
sts.get_caller_identity()
return session
except ClientError:
print(
dedent(
"""
Your Access Expired After 8 Hours.
==================================
You Have To Update Your AWS Credentials File (typically located in
~/.aws/credentials) With The Correct Credentials.
"""
)
)
sys.exit(1)
except UnauthorizedSSOTokenError:
subprocess.run(
["aws", "sso", "login", "--profile", kwargs.get("sso_profile")]
)
return session
Once we have all the basic setup ready, we then create an AthenaClient
class that provides methods for executing queries on Amazon Athena and retrieving the query results for validation.
class AthenaClient:
def __init__(self, boto_session, **kwargs):
self.athena_client = boto_session.session_client("athena")
self._s3_output_bucket = self._get_athena_s3_output_bucket(
boto_session, **kwargs
)
def _get_athena_s3_output_bucket(self, boto_session: boto3.Session, **kwargs):
bucket_name_contains: str = kwargs.get("bucket_name_contains", "athena-output")
bucket_object_prefix: str = kwargs.get("bucket_object_prefix", "cldedl")
s3_client = boto_session.session_client("s3")
try:
buckets = s3_client.list_buckets()
except ClientError:
raise
else:
athena_output_bucket = [
bucket["Name"]
for bucket in buckets["Buckets"]
if bucket_name_contains in bucket["Name"]
][0]
list_objects = s3_client.list_objects(
Bucket=athena_output_bucket, Delimiter="/"
)
sub_dir = [
obj["Prefix"]
for obj in list_objects["CommonPrefixes"]
if bucket_object_prefix in obj["Prefix"]
]
return Path(athena_output_bucket, sub_dir[0]).as_posix()
def execute_query(self, query: str, max_executions=10):
def _get_execution_status(execution_id: str) -> str:
query_execution_stats = self.athena_client.get_query_execution(
QueryExecutionId=execution_id
)
return query_execution_stats["QueryExecution"]["Status"]["State"]
if not query:
print("No query specified")
return
try:
query_execution = self.athena_client.start_query_execution(
QueryString=query,
ResultConfiguration={"OutputLocation": f"s3://{self._s3_output_bucket}"},
)
assert query_execution["ResponseMetadata"]["HTTPStatusCode"] == 200
except self.athena_client.exceptions.InvalidRequestException as e:
raise ConnectionError(
f"Failed to issue: Start query execution command: Reason: {str(e)}"
)
else:
while max_executions > 0:
max_executions -= 1
exec_status = _get_execution_status(query_execution["QueryExecutionId"])
if exec_status == "SUCCEEDED":
break
elif exec_status in ["FAILED", "CANCELLED"]:
raise Exception(
"Athena query with the string "
f'"{query}" failed or was cancelled'
)
time.sleep(1)
if exec_status == "SUCCEEDED":
try:
response = self.athena_client.get_query_results(
QueryExecutionId=query_execution["QueryExecutionId"]
)
except Exception:
raise
else:
return response["ResultSet"]["Rows"]
return False
We also, create a RedShiftClient
class that creates a connection to a Redshift database using the provided configuration.
Future work, includes converting the RedshiftClient
to a Context Manager (for auto-closing the client when done.)
class RedShiftClient:
def __init__(self, config: dict):
self.config = config
def create_connection(self):
conn: redshift_connector.Connection = redshift_connector.connect(
database=self.config["database"],
user=self.config["user"],
host=self.config["host"],
listen_port=self.config["listen_port"],
client_id=self.config["client_id"],
cluster_identifier=self.config["cluster_identifier"],
credentials_provider=self.config["credentials_provider"],
idp_tenant=self.config["idp_tenant"],
scope=self.config["scope"],
iam=self.config["iam"],
ssl=self.config["ssl"],
)
conn.autocommit = True
try:
cursor = conn.cursor()
result = cursor.execute("SELECT 1")
assert result.fetchone() == [1], "Could not establish connection!"
return cursor
except Exception:
cursor.close()
cursor = None
del cursor
raise
Then implement additional logic to execute a query using the preferred service, return results and run validation.
class DataValidator:
pass
class ConfigurationLoader:
pass
def process_athena(aws_service: str):
pass
def process_redshift(aws_service: str):
pass
def validate_table(client: object, table_name: str, column: str, database: str, max_expected_count: int):
pass
def arguments():
pass
The Script
Combining all of the classes, we end up with the following main
function that loads a configuration file, checks if a specific AWS service is configured, and then processes the service based on the provided arguments.
def main():
args = arguments()
config = ConfigurationLoader.load_config(args.config)
service = args.service
if config.get("aws_service"):
aws_service = config["aws_service"]
if service == "athena" and aws_service.get("athena"):
process_athena(aws_service)
elif service == "redshift" and aws_service.get("redshift"):
process_redshift(aws_service)
else:
print(f"Service '{service}' not found in the configuration.")
else:
print("AWS service configuration not found in the configuration file.")
if __name__ == "__main__":
main()
Running the Script
Before you run the tool ensure that you can login to AWS via SSO or AWS Credentials.
We do this by setting up SSO to avoid the need to configure credentials each time you need to run the verification process.
Configure SSO using the following:
aws configure sso
After the configuration is complete, export the sso profile name to an envvar.
export SSO_PROFILE=$(cat ~/.aws/config | grep "\[profile " | sed -e 's/\[//g' -e 's/\]//g' -e 's/profile //g')
This will create an SSO profile under ~/.aws/config
which will be used for any AWS authentication related processes.
Once, the SSO/Credentials are configured then we can run the script:
python AWS_Data_Validation.py -c Config.yaml --service athena
Interpreting Results
$ python AWS_Data_Validation.py -c Config.yaml --service athena
Attempting to automatically open the SSO authorization page in your default browser.
If the browser does not open or you wish to use a different device to authorize this request, open the following URL:
https://device.sso.eu-west-1.amazonaws.com/
Then enter the code:
CNDB-LPRG
Successfully logged into Start URL: https://<id>.awsapps.com/start/
Table: 'table_name': Table exists.
Table: 'table_name': meta_extract_date is recent.
Table: 'table_name': Field-level validation passed.
Table: 'table_name': Metadata duplicates validation passed.
Table: 'table_name': Table count per day validation passed.
Conclusion
TBD