در این بخش، شرکتکنندگان با مفاهیم پایهای مهندسی داده و اصول کار با کلان دادهها آشنا میشوند. این دوره بهعنوان پایهای برای درک عمیقتر فناوریهای مدرن داده طراحی شده و شامل مباحث نظری و عملی است.
CSV، Parquet، و Avro هستند. بهینهسازی این دادهها با استفاده از کدکهای فشردهسازی مانند Snappy (سرعت بالا) و Gzip (فشردهسازی قویتر) برای کاهش حجم و افزایش سرعت انتقال بررسی میشود.JSON، XML، و YAML. این دادهها برای ذخیرهسازی اطلاعات پیچیده مانند لاگهای سیستمی یا دادههای API مناسب هستند.JSON به Parquet با استفاده از Python و fastparquet برای بهینهسازی ذخیرهسازی:
import pandas as pd
import json
# خواندن داده JSON
with open('data.json') as f:
data = json.load(f)
df = pd.DataFrame(data)
# ذخیرهسازی به صورت Parquet
df.to_parquet('data.parquet', compression='snappy')
ETL (استخراج، تبدیل، بارگذاری) و ELT (استخراج، بارگذاری، تبدیل) با ابزارهایی مانند Apache NiFi برای جمعآوری دادههای جریانی و Apache Airflow برای مدیریت جریانهای کاری بررسی میشوند. تفاوت معماریهای Lambda (ترکیب پردازش دستهای و جریانی) و Kappa (تمرکز بر پردازش جریانی) با مثالهای واقعی از پروژههای صنعتی تشریح میشود.Flume، Logstash، یا AWS Kinesis برای جمعآوری دادهها از منابع مختلف.HDFS، AWS S3، یا Google Cloud Storage.Apache Spark، Databricks، یا Flink.REST، داشبوردهای BI مانند Tableau یا Power BI، یا گزارشهای خودکار.Flask برای ارائه دادههای پردازششده:
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/data')
def get_data():
data = {'results': pd.read_parquet('data.parquet').to_dict()}
return jsonify(data)
app.run()
Python با کتابخانههای Pandas، NumPy، PySpark، و Dask برای پردازش دادههای بزرگ.MySQL، PostgreSQL، و NoSQL مانند MongoDB، Redis، و Elasticsearch.AWS، Azure، و GCP برای مدیریت زیرساختهای داده.PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DataProcessing').getOrCreate()
df = spark.read.csv('data.csv')
df.groupBy('category').sum('value').show()
CSV، تبدیل دادهها با استفاده از Pandas، و بارگذاری در پایگاه داده SQLite است. همچنین، بهینهسازی عملکرد با استفاده از Parquet و Snappy بررسی میشود:
import pandas as pd
import sqlite3
# Extract
df = pd.read_csv('input.csv')
# Transform
df['transformed_column'] = df['value'].apply(lambda x: x * 2)
df['category'] = df['category'].str.upper()
# Load to SQLite
conn = sqlite3.connect('output.db')
df.to_sql('results', conn, if_exists='replace', index=False)
# ذخیرهسازی بهینه با Parquet
df.to_parquet('output.parquet', compression='snappy')
ifconfig یا ip، استفاده از پروتکلهای TCP/IP، UDP، و HTTP/HTTPS، و مانیتورینگ ترافیک شبکه با Wireshark یا tcpdump است. مثال عملی: تنظیم یک آدرس IP استاتیک در لینوکس:
# تنظیم آدرس IP استاتیک با netplan
sudo nano /etc/netplan/01-netcfg.yaml
# محتوای فایل:
network:
version: 2
ethernets:
eth0:
addresses: [192.168.1.100/24]
gateway4: 192.168.1.1
nameservers:
addresses: [8.8.8.8, 8.8.4.4]
# اعمال تغییرات
sudo netplan apply
این بخش به آموزش جامع سیستمعامل لینوکس برای مهندسان داده میپردازد که برای مدیریت سرورهای دادهمحور و اجرای ابزارهای کلان داده ضروری است.
Ubuntu، CentOS، Debian، و Red Hat. پیکربندی کرنل لینوکس با استفاده از sysctl برای بهینهسازی عملکرد سرورهای دادهمحور. مدیریت سرویسها با systemd برای کنترل ابزارهایی مانند Apache Spark و Hadoop. مثال عملی: نصب و راهاندازی Apache Spark در Ubuntu:
sudo apt update
sudo apt install openjdk-11-jdk
wget https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
tar -xzf spark-3.4.0-bin-hadoop3.tgz
sudo mv spark-3.4.0-bin-hadoop3 /opt/spark
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin
Bash برای پردازش دادههای متنی بزرگ. دستورات کلیدی شامل grep، awk، sed، و cut. مدیریت پروسهها با ps aux، top، htop، و kill. مثال عملی: استخراج دادههای خاص از یک فایل لاگ:
# استخراج خطوط حاوی ERROR از فایل لاگ
grep ERROR /var/log/app.log > errors.log
# پردازش با awk برای استخراج ستونهای خاص
awk '{print $1, $3}' errors.log > processed_errors.log
ext4، XFS، Btrfs، و NFS. مدیریت دسترسیها با chmod، chown، و ACL برای امنیت دادهها. فشردهسازی و آرشیو با tar، gzip، و zip. مثال عملی: فشردهسازی و انتقال فایلهای داده:
# فشردهسازی دایرکتوری دادهها
tar -czvf data_backup_$(date +%F).tar.gz /data
# انتقال به سرور دیگر
scp data_backup_$(date +%F).tar.gz user@remote:/backup
netplan، nmcli، و ifconfig. مدیریت فایروال با ufw یا iptables برای محافظت از سرورهای داده. مانیتورینگ منابع با htop، vmstat، و iostat. مثال عملی: تنظیم فایروال برای دسترسی به Apache Spark:
# باز کردن پورت 4040 برای Spark UI
sudo ufw allow 4040/tcp
# بررسی وضعیت فایروال
sudo ufw status
Bash برای خودکارسازی وظایف مانند بکاپگیری، مانیتورینگ، و پردازش دادهها. زمانبندی وظایف با cron و anacron. مثال عملی: اسکریپت بکاپگیری خودکار:
#!/bin/bash
# بکاپگیری دادهها
BACKUP_DIR=/backup
DATA_DIR=/data
tar -czvf $BACKUP_DIR/backup_$(date +%F).tar.gz $DATA_DIR
# حذف بکاپهای قدیمیتر از 7 روز
find $BACKUP_DIR -name "*.tar.gz" -mtime +7 -delete
# زمانبندی با cron
echo "0 2 * * * /backup.sh" | crontab
Ubuntu، نصب و پیکربندی Apache Spark و HDFS، و اجرای یک Job ساده برای پردازش دادههای نمونه. شرکتکنندگان یک فایل CSV را در HDFS ذخیره کرده و با Spark پردازش میکنند:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Workshop').getOrCreate()
df = spark.read.csv('hdfs://localhost:9000/data/sample.csv')
df.groupBy('category').count().write.parquet('hdfs://localhost:9000/output')
این بخش به آموزش استفاده از Docker و Kubernetes برای مدیریت و استقرار خطوط لوله داده در محیطهای مقیاسپذیر میپردازد.
docker build، docker run، docker ps، و docker-compose. بهینهسازی ایمیجها با استفاده از Multi-Stage Builds برای کاهش حجم ایمیجها. مثال عملی: ساخت یک ایمیج سفارشی برای Apache Airflow:
# Dockerfile
FROM apache/airflow:2.7.0
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# requirements.txt
pandas==2.2.2
pyspark==3.4.0
# ساخت و اجرای ایمیج
docker build -t my-airflow .
docker run -p 8080:8080 my-airflow
Kubernetes با ابزار kubectl. تعریف موجودیتهای Pod، Deployment، Service، و Ingress برای مدیریت کانتینرهای دادهمحور. مقیاسپذیری خودکار با HPA و مدیریت منابع با ResourceQuotas. مثال عملی: استقرار یک Deployment در Kubernetes:
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-deployment
spec:
replicas: 3
selector:
matchLabels:
app: airflow
template:
metadata:
labels:
app: airflow
spec:
containers:
- name: airflow
image: my-airflow:latest
ports:
- containerPort: 8080
# اعمال Deployment
kubectl apply -f deployment.yaml
Apache Airflow در یک کانتینر داکر و مدیریت آن در یک خوشه Kubernetes. شرکتکنندگان یک Dockerfile و docker-compose.yml برای اجرای Airflow و اتصال به PostgreSQL ایجاد میکنند:
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
airflow:
image: my-airflow:latest
ports:
- "8080:8080"
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# اجرای سرویسها
docker-compose up -d
این بخش به آموزش مفاهیم پایه پایگاههای داده رابطهای با تمرکز بر PostgreSQL میپردازد.
ERD با ابزارهایی مانند pgAdmin یا DBDiagram. تعریف روابط یک به یک، یک به چند، و چند به چند با کلیدهای اصلی (Primary Key) و خارجی (Foreign Key). مثال عملی: طراحی جدول برای سیستم مدیریت انبار:
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
price DECIMAL(10,2) NOT NULL
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
product_id INT REFERENCES products(id),
quantity INT NOT NULL,
order_date DATE NOT NULL
);
CREATE TABLE customers (
customer_id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100)
);
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
customer_id INT REFERENCES customers(customer_id),
product_id INT REFERENCES products(id),
quantity INT
);
SELECT، JOIN، SUBQUERY، UNION، و توابع تجمیعی (COUNT، SUM، AVG). مثال عملی: پرسوجوی تحلیلی برای گزارشگیری:
SELECT c.name, COUNT(o.order_id) AS order_count, SUM(o.quantity * p.price) AS total_revenue
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
JOIN products p ON o.product_id = p.id
GROUP BY c.name
HAVING COUNT(o.order_id) > 5
ORDER BY total_revenue DESC;
B-Tree، Hash، و GIN برای بهبود عملکرد پرسوجوها. مدیریت تراکنشها با ویژگیهای ACID. بهینهسازی پرسوجوها با EXPLAIN ANALYZE. مثال عملی: بهینهسازی یک پرسوجو:
CREATE INDEX idx_product_name ON products(name);
EXPLAIN ANALYZE SELECT * FROM products WHERE name = 'Laptop';
PostgreSQL برای یک سیستم مدیریت انبار. شرکتکنندگان جداول نرمالشده ایجاد کرده، دادههای نمونه وارد میکنند، و پرسوجوهای تحلیلی برای گزارشگیری اجرا میکنند. مثال عملی:
CREATE TABLE warehouse (
id SERIAL PRIMARY KEY,
product_id INT REFERENCES products(id),
stock INT NOT NULL,
last_updated TIMESTAMP
);
INSERT INTO warehouse (product_id, stock, last_updated)
VALUES (1, 100, NOW());
SELECT p.name, w.stock
FROM products p
JOIN warehouse w ON p.id = w.product_id
WHERE w.stock < 10;
این بخش به بررسی مفاهیم انبار داده و ابزارهای تحلیلی مانند Trino برای پردازش دادههای بزرگ میپردازد.
PostgreSQL، MySQL)، NoSQL (MongoDB، Cassandra)، و تحلیلی (Snowflake، BigQuery، Redshift) از نظر معماری، مقیاسپذیری، و کاربردهای تحلیلی و تراکنشی.Data Lake، Data Warehouse، و Data Mart. استفاده از فرمتهای ستونی مانند Parquet و ORC برای ذخیرهسازی بهینه و کاهش زمان پردازش. مثال عملی: ذخیرهسازی داده در Parquet با PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DataLake').getOrCreate()
df = spark.read.csv('data.csv')
df.write.parquet('s3a://bucket/data.parquet', compression='snappy')
Apache Spark و پردازش جریانی با Apache Kafka و Flink. مقایسه OLTP (پردازش تراکنشهای بلادرنگ) و OLAP (تحلیل دادههای بزرگ). مثال عملی: پردازش دستهای با Spark:
df = spark.read.parquet('s3a://bucket/data.parquet')
df.groupBy('region').agg({'sales': 'sum'}).show()
Star Schema و Snowflake Schema برای سازماندهی دادههای تحلیلی. مثال عملی: ایجاد یک Star Schema در PostgreSQL:
CREATE TABLE dim_product (
product_id SERIAL PRIMARY KEY,
name VARCHAR(100),
category VARCHAR(50)
);
CREATE TABLE fact_sales (
sale_id SERIAL PRIMARY KEY,
product_id INT REFERENCES dim_product(product_id),
date_id INT REFERENCES dim_date(date_id),
sales_amount DECIMAL(10,2)
);
MPP در ابزارهایی مانند Trino، Snowflake، و BigQuery برای اجرای پرسوجوهای مقیاسپذیر. مثال عملی: اجرای یک پرسوجوی Trino:
SELECT region, SUM(sales_amount) AS total_sales
FROM hive.default.sales_data
GROUP BY region
ORDER BY total_sales DESC;
Hive، MySQL، PostgreSQL، و S3. پیکربندی Trino برای اتصال به Data Lake. مثال عملی: اتصال Trino به S3:
# پیکربندی catalog در Trino
# فایل: etc/catalog/hive.properties
connector.name=hive
hive.metastore.uri=thrift://hive-metastore:9083
hive.s3.aws-access-key=your_access_key
hive.s3.aws-secret-key=your_secret_key
# اجرای پرسوجو
SELECT * FROM hive.default.sales_data LIMIT 10;
Trino به یک Data Lake در S3، ذخیرهسازی دادهها در فرمت Parquet، و اجرای پرسوجوهای تحلیلی برای گزارشگیری. مثال عملی:
SELECT d.year, SUM(s.sales_amount) AS total_sales
FROM hive.default.sales_data s
JOIN hive.default.dim_date d ON s.date_id = d.date_id
GROUP BY d.year
ORDER BY total_sales DESC;
این بخش به آموزش پایگاههای داده پیشرفته NoSQL و تحلیلی مانند Cassandra و Clickhouse میپردازد.
Window Functions، CTE، و Recursive CTE برای پرسوجوهای پیچیده. مثال عملی: استفاده از Window Functions در PostgreSQL:
SELECT name, sales_amount,
RANK() OVER (PARTITION BY region ORDER BY sales_amount DESC) AS sales_rank
FROM sales_data;
CQL. تنظیم Consistency Levels (مانند QUORUM، ONE) و Replication Factor برای مقیاسپذیری و تحمل خطا. مثال عملی: ایجاد جدول در Cassandra:
CREATE KEYSPACE warehouse
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
CREATE TABLE warehouse.products (
id UUID PRIMARY KEY,
name TEXT,
price DECIMAL,
stock INT
);
INSERT INTO warehouse.products (id, name, price, stock)
VALUES (uuid(), 'Laptop', 1000.00, 50);
OLAP. استفاده از فشردهسازی داده و ایندکسهای Skip Index برای بهینهسازی. مثال عملی: ایجاد جدول و پرسوجو در Clickhouse:
CREATE TABLE sales (
sale_id UInt64,
product_id UInt32,
sale_date Date,
amount Float64
) ENGINE = MergeTree()
ORDER BY (sale_date, product_id);
SELECT product_id, SUM(amount) AS total
FROM sales
GROUP BY product_id
ORDER BY total DESC
LIMIT 10;
Cassandra برای تراکنشهای بلادرنگ و Clickhouse برای تحلیلهای تحلیلی. مثال عملی: انتقال داده از Cassandra به Clickhouse با Python:
from cassandra.cluster import Cluster
import clickhouse_driver
# اتصال به Cassandra
cluster = Cluster(['localhost'])
session = cluster.connect('warehouse')
rows = session.execute('SELECT * FROM products')
# اتصال به Clickhouse
client = clickhouse_driver.Client('localhost')
client.execute('INSERT INTO sales (sale_id, product_id, sale_date, amount) VALUES',
[(row.id, row.product_id, row.sale_date, row.amount) for row in rows])
این بخش به آموزش ابزارهای کلان داده مانند Hadoop، Spark، و Kafka برای پردازش و تحلیل دادههای بزرگ میپردازد.
HDFS و پردازش با MapReduce. مدیریت منابع با YARN و هماهنگی با Zookeeper. مثال عملی: اجرای یک Job MapReduce در Hadoop:
hadoop jar hadoop-examples.jar wordcount /input /output
Spark Core، Spark SQL، Spark Streaming، و MLlib. مثال عملی: پردازش داده با Spark SQL:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()
df = spark.read.parquet('hdfs://localhost:9000/data')
df.createOrReplaceTempView('sales')
spark.sql('SELECT region, SUM(sales) AS total FROM sales GROUP BY region').show()
Volume، Velocity، Variety، Veracity، و Value. مثال عملی: تحلیل دادههای بزرگ با Spark و Parquet.Sharding و Replication. مثال عملی: تنظیم Sharding در MongoDB:
sh.enableSharding('warehouse')
sh.shardCollection('warehouse.products', {'id': 'hashed'})
Apache Kafka و Spark Streaming. مقایسه معماریهای Lambda و Kappa. مثال عملی: پردازش جریانی با Spark Streaming:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('KafkaStream').getOrCreate()
df = spark.readStream.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'topic1') \
.load()
df.selectExpr('CAST(value AS STRING)').writeStream \
.format('console').start().awaitTermination()
Topic، Partition، و Consumer Groups. مثال عملی: ایجاد یک Topic در Kafka:
kafka-topics.sh --create --topic sales-data --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2
Spark Streaming و Kafka برای تحلیل دادههای بلادرنگ از حسگرهای IoT:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('IoTStream').getOrCreate()
df = spark.readStream.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'iot-data') \
.load()
df.selectExpr('CAST(value AS STRING) AS sensor_data') \
.writeStream.format('parquet') \
.option('path', 'hdfs://localhost:9000/iot-output') \
.option('checkpointLocation', '/checkpoint') \
.start().awaitTermination()
این بخش به آموزش مفاهیم CI/CD و ابزارهای مرتبط برای خودکارسازی فرآیندهای توسعه و استقرار خطوط لوله داده میپردازد.
SDLC شامل تحلیل نیازمندیها، طراحی، توسعه، تست، و استقرار. معرفی متدولوژیهای Waterfall و Agile. مثال عملی: تعریف یک پروژه در Jira برای مدیریت خط لوله داده.Agile و Scrum با استفاده از ابزارهای مدیریت پروژه مانند Jira، Trello، یا Asana. مثال عملی: تنظیم یک بورد Scrum در Jira برای تیم مهندسی داده.GitLab، تنظیم خطوط لوله CI/CD، و تحلیل کیفیت کد با SonarQube. مثال عملی: تنظیم یک خط لوله CI/CD در GitLab:
# .gitlab-ci.yml
stages:
- build
- test
- deploy
build_job:
stage: build
script:
- echo "Building data pipeline..."
- docker build -t my-data-pipeline .
test_job:
stage: test
script:
- echo "Running tests..."
- docker run my-data-pipeline pytest
deploy_job:
stage: deploy
script:
- echo "Deploying to Kubernetes..."
- kubectl apply -f deployment.yaml
CI/CD با ابزارهای داده مانند Apache Airflow، Spark، و Docker. مثال عملی: خودکارسازی استقرار Airflow با GitLab CI:
# .gitlab-ci.yml
deploy_airflow:
stage: deploy
script:
- docker push my-airflow:latest
- kubectl apply -f airflow-deployment.yaml
CI/CD در GitLab برای استقرار یک اپلیکیشن دادهمحور. شرکتکنندگان یک پروژه داده را با Docker و Kubernetes مستقر میکنند:
# .gitlab-ci.yml
stages:
- build
- test
- deploy
build_job:
stage: build
script:
- docker build -t my-data-app .
- docker push my-data-app:latest
test_job:
stage: test
script:
- docker run my-data-app pytest
deploy_job:
stage: deploy
script:
- kubectl apply -f k8s-deployment.yaml