In the world of modern data analytics and processing, the diversity of data sources available for ingestion spans a wide spectrum. From traditional file formats such as CSV, JSON, and XML to robust database systems encompassing both SQL and NoSQL variants, the landscape expands further to include dynamic APIs such as REST, facilitating real-time data retrieval. Message queues such as Kafka offer scalable solutions for handling event-driven data while streaming services such as Kinesis and pub/sub enable continuous data flows crucial for applications demanding immediate insights. Understanding and effectively harnessing these diverse data ingestion sources is fundamental to building robust data pipelines that support a broad array of analytical and operational needs.
Let’s start with event processing.
Event data processing solution
In a real-time processing system, data is ingested, processed, and responded to almost instantaneously, as we’ve discussed. Real-time processing systems often use message queues to handle incoming data streams and ensure that data is processed in the order it is received, without delays.
The following Python code demonstrates a basic example of using a message queue for processing messages, which is a foundational concept in both real-time and semi-real-time data processing systems. The Queue
class from Python’s queue
module is used to create a queue—a data structure that follows the First-in-First-out (FIFO) principle. In this context, a queue is used to manage messages or tasks that need to be processed. The code simulates an event-based system where messages (in this case, strings such as message 0
, message 1
, etc.) are added to a queue. This mimics a scenario wherein events or tasks are generated and need to be processed in the order they arrive. Let’s have a look at each part of the code. You can find the code file at https://github.com/PacktPublishing/Python-Data-Cleaning-and-Preparation-Best-Practices/blob/main/chapter01/4.work_with_queue.py:
- The
read_message_queue()
function initializes a queue object q
using the Queue
class from the queue
module:def read_message_queue():
q = Queue()
- This loop adds 10 messages to the queue. Each message is a string in the format message
i
, where i
ranges from 0 to 9:for i in range(10): # Mocking messages
q.put(f"message {i}")
- This loop continuously retrieves and processes messages from the queue until it is empty.
q.get()
retrieves a message from the queue, and q.task_done()
signals that the retrieved message has been processed:while not q.empty():
message = q.get()
process_message(message)
q.task_done() # Signal that the task is done
- The following function takes a message as input and prints it to the console, simulating the processing of the message:
def process_message(message):
print(f"Processing message: {message}")
- Call the
read_message_queue
function:read_message_queue()
Here, the read_message_queue
function reads messages from the queue and processes them one by one using the process_message
function. This demonstrates how event-based systems handle tasks—by placing them in a queue and processing each task as it becomes available.
The while not q.empty()
loop ensures that each message is processed in the exact order it was added to the queue. This is crucial in many real-world applications where the order of processing matters, such as in handling user requests or processing logs.
The q.task_done()
method signals that a message has been processed. This is important in real-world systems where tracking the completion of tasks is necessary for ensuring reliability and correctness, especially in systems with multiple workers or threads.
In real-world applications, message queues are often integrated into more sophisticated data streaming platforms to ensure scalability, fault tolerance, and high availability. For instance, in real-time data processing, platforms such as Kafka and AWS Kinesis come into play.
Ingesting event data with Apache Kafka
There are different technologies to ingest and handle event data. One of the technologies we will discuss is Apache Kafka. Kafka is an open source distributed event streaming platform first developed by LinkedIn and later donated to the Apache Software Foundation. It is designed to handle large amounts of data in real-time and provides a scalable and fault-tolerant system for processing and storing streams.
Figure 1.1 – Components of Apache Kafka
Let’s see the different components of Apache Kafka:
- Ingestion: Data streams can be ingested into Kafka using Kafka producers. Producers are applications that write data to Kafka topics, which are logical channels that can hold and organize data streams.
- Processing: Kafka can process streams of data using Kafka Streams, a client library for building real-time stream processing applications. Kafka Streams allows developers to build custom stream-processing applications that can perform transformations, aggregations, and other operations on data streams.
- Storage: Kafka stores data streams in distributed, fault-tolerant clusters called Kafka brokers. Brokers store the data streams in partitions, which are replicated across numerous brokers for fault tolerance.
- Consumption: Data streams can be consumed from Kafka using Kafka consumers. Consumers are applications that read data from Kafka topics and process it as needed.
Several libraries can be used to interact with Apache Kafka in Python; we will explore the most popular ones in the next section.
Which library should you use for your use case?
Kafka-Python
is a pure Python implementation of Kafka’s protocol, offering a more Pythonic interface for interacting with Kafka. It is designed to be simple and easy to use, making it particularly appealing for beginners. One of its primary advantages is its simplicity, making it easier to install and use compared to other Kafka libraries. Kafka-Python is flexible and well-suited for small to medium-sized applications, providing the essential features needed for basic Kafka operations without the complexity of additional dependencies. Its pure Python nature means that it does not rely on any external libraries beyond Python itself, streamlining the installation and setup process.
Confluent-kafka-python
is a library developed and maintained by Confluent, the original creator of Kafka. It stands out for its high-performance and low-latency capabilities, leveraging the librdkafka
C library for efficient operations. The library offers extensive configuration options akin to the Java Kafka client and closely aligns with Kafka’s feature set, often pioneering support for new Kafka features. It is particularly well-suited for production environments where both performance and stability are crucial, making it an ideal choice for handling high-throughput data streams and ensuring reliable message processing in critical applications.
Transitioning from event data processing to databases involves shifting focus from real-time data streams to persistent data storage and retrieval. While event data processing emphasizes handling continuous streams of data in near real-time for immediate insights or actions, databases are structured repositories designed for storing and managing data over the long term.
Ingesting data from databases
Databases, whether relational or non-relational, serve as foundational components in data management systems. Classic databases and NoSQL databases are two different types of database management systems that differ in architecture and characteristics. A classic database, also known as a relational database, stores data in tables with a fixed schema. Classic databases are ideal for applications that require complex querying and transactional consistency, such as financial systems or enterprise applications.
On the other hand, NoSQL databases do not store data in tables with a fixed schema. They use a document-based approach to store data in a flexible schema format. They are designed to be scalable and handle large amounts of data, with a focus on high-performance data retrieval. NoSQL databases are well-suited for applications that require high performance and scalability, such as real-time analytics, content management systems, and e-commerce platforms.
Let’s start with relational databases.
Performing data ingestion from a relational database
Relational databases are useful for batch ETL processes where structured data from various sources needs consolidation, transformation, and loading into a data warehouse or analytical system. SQL-based operations are efficient for joining and aggregating data before processing. Let’s try to understand how SQL databases represent data in tables with rows and columns using a code example. We’ll simulate a basic SQL database interaction using Python dictionaries to represent tables and rows. You can see the full code example at https://github.com/PacktPublishing/Python-Data-Cleaning-and-Preparation-Best-Practices/blob/main/chapter01/5.sql_databases.py:
- We create a
read_sql
function that simulates reading rows from a SQL table, represented here as a list of dictionaries where each dictionary corresponds to a row in the table:def read_sql():
# Simulating a SQL table with a dictionary
sql_table = [
{"id": 1, "name": "Alice", "age": 30},
{"id": 2, "name": "Bob", "age": 24},
]
for row in sql_table:
process_row(row)
- The
process_row
function takes a row (dictionary) as input and prints its contents, simulating the processing of a row from a SQL table:def process_row(row):
print(f"Processing row: id={row['id']}, name={row['name']}, age={row['age']}")
read_sql()
- Let’s print our SQL table in the proper format:
print(f"{'id':<5} {'name':<10} {'age':<3}")
print("-" * 20)
# Print each row
for row in sql_table:
print(f"{row['id']:<5} {row['name']:<10} {row['age']:<3}")
This will print the following output:
id name age
------------------
1 Alice 30
2 Bob 24
The key to learning from the previous example is understanding how SQL databases structure and manage data through tables composed of rows and columns, and how to efficiently retrieve and process these rows programmatically. This knowledge is crucial because it lays the foundation for effective database management and data manipulation in any application.
In real-world applications, this interaction is often facilitated by libraries and drivers such as Java Database Connectivity (JDBC) or Open Database Connectivity (ODBC), which provide standardized methods for connecting to and querying databases. These libraries are typically wrapped by higher-level frameworks or libraries in Python, making it easier for developers to ingest data from various SQL databases without worrying about the underlying connectivity details. Several libraries can be used to interact with SQL databases using Python; we will explore the most popular ones in the following section.
Which library should you use for your use case?
Let’s explore the different libraries available for interacting with SQL databases in Python, and understand when to use each one:
- SQLite (sqlite3) is ideal for small to medium-sized applications, local storage, and prototyping. Its zero-configuration, serverless architecture makes it perfect for lightweight, embedded database needs and quick development cycles. It is especially useful in scenarios where the overhead of a full-fledged database server is unnecessary. Avoid using sqlite3 for applications requiring high concurrency or extensive write operations, or where multiple users need to access the database simultaneously. It is not suitable for large-scale applications or those needing robust security features and advanced database functionalities.
- SQLAlchemy is suitable for applications requiring a high level of abstraction over raw SQL, support for multiple database engines, and complex queries and data models. It is ideal for large-scale production environments that need flexibility, scalability, and the ability to switch between different databases with minimal code changes. Avoid using SQLAlchemy for small, lightweight applications where the overhead of its comprehensive ORM capabilities is unnecessary. If you need direct, low-level access to a specific database’s features and are comfortable writing raw SQL queries, a simpler database adapter such as sqlite3, Psycopg2, or MySQL Connector/Python might be more appropriate.
- Psycopg2 is the go-to choice for interacting with PostgreSQL databases, making it suitable for applications that leverage PostgreSQL’s advanced features, such as ACID compliance, complex queries, and extensive data types. It is ideal for production environments requiring reliability and efficiency in handling PostgreSQL databases. Avoid using Psycopg2 if your application does not interact with PostgreSQL. If you need compatibility with multiple database systems or a higher-level abstraction, consider using SQLAlchemy instead. Also, it might not be the best choice for lightweight applications where the overhead of a full PostgreSQL setup is unnecessary.
- MySQL Connector/Python (
mysql-connector-python
) is great for applications that need to interact directly with MySQL databases. It is suitable for environments where compatibility and official support from Oracle are critical, as well as for applications leveraging MySQL’s features such as transaction management and connection pooling. Do not use MySQL Connector/Python if your application requires compatibility with multiple database systems or a higher-level abstraction. For simpler applications where the overhead of a full MySQL setup is unnecessary, or where MySQL’s features are not specifically needed, consider other lightweight alternatives.
After understanding the various libraries and their use cases for interacting with SQL databases, it’s equally important to explore alternatives for scenarios where the traditional relational model of SQL databases may not be the best fit. This brings us to NoSQL databases, which offer flexibility, scalability, and performance for handling unstructured or semi-structured data. Let’s delve into the key Python libraries for working with popular NoSQL databases and examine when and how to use them effectively.
Performing data ingestion from the NoSQL database
Non-relational databases can be used for storing and processing large volumes of semi-structured or unstructured data in batch operations. They are particularly effective when the schema can evolve or when handling diverse data types in a consolidated manner. NoSQL databases excel in streaming and semi-real-time workloads due to their ability to handle high throughput and low-latency data ingestion. They are commonly used for capturing and processing real-time data from IoT devices, logs, social media feeds, and other sources that generate continuous streams of data.
The provided Python code mocks a NoSQL database with a dictionary and processes each key-value pair. Let’s have a look at each part of the code:
- The
process_entry
function takes a key and its associated value from the data store and prints a formatted message showing the processing of that key-value pair. It provides a simple way to view or handle individual entries, highlighting how data is accessed and manipulated based on its key:def process_entry(key, value):
print(f"Processing key: {key} with value: {value}")
- The following function prints the entire
data_store
dictionary in a tabular format:def print_data_store(data_store):
print(f"{'Key':<5} {'Name':<10} {'Age':<3}")
print("-" * 20)
for key, value in data_store.items():
print(f"{key:<5} {value['name']:<10} {value['age']:<3}")
It starts by printing column headers for Key
, Name
, and Age
, followed by a separator line for clarity. It then iterates over all key-value pairs in the data_store
dictionary, printing each entry’s key, name, and age. This function helps visualize the current state of the data store. The initial state of the data is as follows:
Initial Data Store:
Key Name Age
-----------------------
1 Alice 30
2 Bob 24
- This function adds a new entry to the
data_store
dictionary:def create_entry(data_store, key, value):
data_store[key] = value
return data_store
It takes a key and a value, then inserts the value into data_store
under the specified key. The updated data_store
dictionary is then returned. This demonstrates the ability to add new data to the store, showcasing the creation aspect of Create, Read, Update, and Delete (CRUD) operations.
- The
update_entry
function updates an existing entry in the data_store
dictionary:def update_entry(data_store, key, new_value):
if key in data_store:
data_store[key] = new_value
return data_store
It takes a key and new_value
, and if the key exists in the data_store
dictionary, it updates the corresponding value with new_value
. The updated data_store
dictionary is then returned. This illustrates how existing data can be modified, demonstrating the update aspect of CRUD operations.
- The following function removes an entry from the
data_store
dictionary:def delete_entry(data_store, key):
if key in data_store:
del data_store[key]
return data_store
It takes a key, and if the key is found in the data_store
dictionary, it deletes the corresponding entry. The updated data_store
dictionary is then returned.
- The following function wraps all the process together:
def read_nosql():
data_store = {
"1": {"name": "Alice", "age": 30},
"2": {"name": "Bob", "age": 24},
}
print("Initial Data Store:")
print_data_store(data_store)
# Create: Adding a new entry
new_key = "3"
new_value = {"name": "Charlie", "age": 28}
data_store = create_entry(data_store, new_key, new_value)
# Read: Retrieving and processing an entry
print("\nAfter Adding a New Entry:")
process_entry(new_key, data_store[new_key])
# Update: Modifying an existing entry
update_key = "1"
updated_value = {"name": "Alice", "age": 31}
data_store = update_entry(data_store, update_key, updated_value)
# Delete: Removing an entry
delete_key = "2"
data_store = delete_entry(data_store, delete_key)
# Print the final state of the data store
print("\nFinal Data Store:")
print_data_store(data_store)
This code illustrates the core principles of NoSQL databases, including schema flexibility, key-value pair storage, and basic CRUD operations. It begins with the read_nosql()
function, which simulates a NoSQL database using a dictionary, data_store
, where each key-value pair represents a unique identifier and associated user information. Initially, the print_data_store()
function displays the data in a tabular format, highlighting the schema flexibility inherent in NoSQL systems. The code then demonstrates CRUD operations. It starts by adding a new entry with the create_entry()
function, showcasing how new data is inserted into the store. Following this, the process_entry()
function retrieves and prints the details of the newly added entry, illustrating the read operation. Next, the update_entry()
function modifies an existing entry, demonstrating the update capability of NoSQL databases. The delete_entry()
function is used to remove an entry, showing how data can be deleted from the store. Finally, the updated state of the data_store
dictionary is printed again, providing a clear view of how the data evolves through these operations.
- Let’s execute the whole process:
read_nosql()
This returns the final datastore:
Final Data Store:
Key Name Age
-----------------------
1 Alice 31
2 Charlie 28
In the preceding example, we demonstrated an interaction with a mocked NoSQL system using Python so that we can showcase the core principles of NoSQL databases such as schema flexibility, key-value pair storage, and basic CRUD operations. We can now better grasp how NoSQL databases differ from traditional SQL databases in terms of data modeling and handling unstructured or semi-structured data efficiently.
There are several libraries that can be used to interact with NoSQL databases. In the next section, we will explore the most popular ones.
Which library should you use for your use case?
Let’s explore the different libraries available for interacting with NoSQL databases in Python, and understand when to use each one:
pymongo
is the official Python driver for MongoDB, a popular NoSQL database known for its flexibility and scalability. pymongo
allows Python applications to interact seamlessly with MongoDB, offering a straightforward API to perform CRUD operations, manage indexes, and execute complex queries. pymongo
is particularly favored for its ease of use and compatibility with Python’s data structures, making it suitable for a wide range of applications from simple prototypes to large-scale production systems.
cassandra-driver
(Cassandra): The cassandra-driver
library provides Python applications with direct access to Apache Cassandra, a highly scalable NoSQL database designed for handling large amounts of data across distributed commodity servers. Cassandra’s architecture is optimized for write-heavy workloads and offers tunable consistency levels, making it suitable for real-time analytics, IoT data, and other applications requiring high availability and fault tolerance.
Transitioning from databases to file systems involves shifting the focus from structured data storage and retrieval mechanisms to more flexible and versatile storage solutions.
Performing data ingestion from cloud-based file systems
Cloud storage is a service model that allows data to be remotely maintained, managed, and backed up over the internet. It involves storing data on remote servers accessed from anywhere via the internet, rather than on local devices. Cloud storage has revolutionized the way we store and access data. It provides a flexible and scalable solution for individuals and organizations, enabling them to store large amounts of data without investing in physical hardware. This is particularly useful for ensuring that data is always accessible and can be shared easily.
Amazon S3, Microsoft Azure Blob Storage, and Google Cloud Storage are all cloud-based object storage services that allow you to store and retrieve files in the cloud. Cloud-based file systems are becoming increasingly popular for several reasons.
Firstly, they provide a flexible and scalable storage solution that can easily adapt to the changing needs of an organization. This means that as the amount of data grows, additional storage capacity can be added without the need for significant capital investment or physical infrastructure changes. Thus, it can help reduce capital expenditures and operational costs associated with maintaining and upgrading on-premises storage infrastructure.
Secondly, cloud-based file systems offer high levels of accessibility and availability. With data stored in the cloud, users can access it from anywhere with an internet connection, making it easier to collaborate and share information across different teams, departments, or locations. Additionally, cloud-based file systems are designed with redundancy and failover mechanisms, ensuring that data is always available even in the event of a hardware failure or outage. Finally, they provide enhanced security features to protect data from unauthorized access, breaches, or data loss. Cloud service providers typically have advanced security protocols, encryption, and monitoring tools to safeguard data and ensure compliance with data privacy regulations.
Files in cloud-based storage systems are essentially the same as those on local devices, but they are stored on remote servers and accessed over the internet. However, how are these files organized in these cloud storage systems? Let’s discuss that next.
Organizing files in cloud storage systems
One of the primary methods of organizing files in cloud storage is by using folder structures, similar to local file systems. Users can create folders and subfolders to categorize and store files systematically. Let’s have a look at some best practices:
- Creating a logical and intuitive hierarchy that reflects how you work or how your projects are structured is essential. This involves designing a folder structure that mimics your workflow, making it easier to locate and manage files. For instance, you might create main folders for different departments, projects, or clients, with subfolders for specific tasks or document types. This hierarchical organization not only saves time by reducing the effort needed to find files but also enhances collaboration by providing a clear and consistent framework that team members can easily navigate.
- Using consistent naming conventions for folders and files is crucial for ensuring easy retrieval and maintaining order within your cloud storage. A standardized naming scheme helps avoid confusion, reduces errors, and speeds up the process of locating specific documents. For example, adopting a format such as
YYYY-MM-DD_ProjectName_DocumentType
can provide immediate context and make sorting and searching more straightforward. Consistent naming also facilitates automation and integration with other tools, as predictable file names can be more easily processed by scripts and applications.
- Grouping files by project or client is an effective way to keep related documents together and streamline project management. This method involves creating dedicated folders for each project or client, where all relevant files, such as contracts, communications, and deliverables, are stored.
- Many cloud storage systems allow tagging files with keywords or metadata, which significantly enhances file categorization and searchability. Tags are essentially labels that you can attach to files, making it easier to group and find documents based on specific criteria. Metadata includes detailed information, such as the author, date, project name, and file type, which provides additional context and aids in more precise searches. By using relevant tags and comprehensive metadata, you can quickly filter and locate files, regardless of their location within the folder hierarchy. This practice is particularly useful in large storage systems where traditional folder structures might become cumbersome.
From discussing cloud storage systems, the focus now shifts to exploring the capabilities and integration opportunities offered by APIs.
APIs
APIs have become increasingly popular in recent years due to their ability to enable seamless communication and integration between different systems and services. APIs provide developers with a standardized and flexible way to access data and functionality from other systems, allowing them to easily build new applications and services that leverage existing resources. APIs have become a fundamental building block for modern software development and are widely used across a wide range of industries and applications.
Now that we understand what APIs represent, let’s move on to the requests
Python library with which developers can programmatically access and manipulate data from remote servers.
The requests library
When it comes to working with APIs in Python, the requests
library is the go-to Python library for making HTTP requests to APIs and other web services. It makes it easy to send HTTP/1.1 requests using Python, and it provides many convenient features for working with HTTP responses.
Run the following command to install the requests
library:
pip install requests==2.32.3
Let’s have a quick look at how we can use this library:
- Import the
requests
library:import requests
- Specify the API endpoint URL:
url = "https://jsonplaceholder.typicode.com/posts"
- Make a
GET
request to the API endpoint:response = requests.get(url)
- Get the response content:
print(response.content)
Here, we’re making a GET
request to the API endpoint at https://jsonplaceholder.typicode.com/posts and storing the response
object in the response
variable. We can then print the response content using the content
attribute of the response
object. The requests
library provides many other methods and features for making HTTP requests, including support for POST
, PUT
, DELETE
, and other HTTP methods, handling headers and cookies, and handling redirects and authentication.
Now that we’ve explained the requests
library, let’s move on to a specific example of retrieving margarita cocktail data from the Cocktail DB
API, which can illustrate how practical web requests can be in accessing and integrating real-time data sources into applications.
Learn how to make a margarita!
The use case demonstrates retrieving cocktail data from the Cocktail DB
API using Python. If you want to improve your bartending skills and impress your friends, you can use an open API to get real-time information on the ingredients required for any cocktail. For this, we will use the Cocktail DB
API and the request
library to see which ingredients we need for a margarita:
- Define the API endpoint URL. We are making a request to the Cocktail DB API endpoint to search for cocktails with the
margarita
name:url = "https://www.thecocktaildb.com/api/json/v1/1/search.php?s=margarita"
- Make the API request. We define the API endpoint URL as a string and pass it to the
requests.get()
function to make the GET
request:response = requests.get(url)
- Check whether the request was successful (status code
200
) and get the data. The API response is returned as a JSON string, which we can extract by calling the response.json()
method. We then assign this JSON data to a variable called data
:if response.status_code == 200:
# Extract the response JSON data
data = response.json()
# Check if the API response contains cocktails data
if 'drinks' in data:
# Create DataFrame from drinks data
df = pd.DataFrame(data['drinks'])
# Print the resulting DataFrame
print(df.head())
else:
print("No drinks found.")
- If the request was not successful, print this error message:
else:
print(f"Failed to retrieve data from API. Status code: {response.status_code}")
You can replace the margarita
search parameter with any other cocktail name or ingredient to get data for different drinks.
With this, we come to the end of our first chapter. Let’s summarize what we have learned so far.