Write Your Own Flask SQLAlchemy Extension

When it comes to connecting to database in Flask project, we tend to use the Flask-SQLAlchemy extension that handles the lifecycle of database connection, add a certain of utilities for defining models and executing queries, and integrate well with the Flask framework. However, if you are developing a rather simple project with Flask and SQLAlchemy, and do not want to depend on another third-party library, or you prefer using SQLAlchemy directly, making the model layer agnostic of web frameworks, you can write your own extension. Besides, you will gain better type hints for SQLAlchemy model, and possibly easier migration to SQLAlchemy 2.x. This article will show you how to integrate SQLAlchemy 1.4 with Flask 2.1.

The alpha version

In the official document Flask Extension Development, it shows us writing a sqlite3 extension that plays well with Flask application context. So our first try is to replace sqlite3 with SQLAlchemy:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from typing import Optional

from flask import Flask, current_app
from flask.globals import _app_ctx_stack, _app_ctx_err_msg
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine


class SQLAlchemyAlpha:
def __init__(self, app: Optional[Flask] = None):
self.app = app
if app is not None:
self.init_app(app)

def init_app(self, app: Flask):
app.config.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite://')
app.teardown_appcontext(self.teardown)

def connect(self) -> Engine:
return create_engine(current_app.config['SQLALCHEMY_DATABASE_URI'])

def teardown(self, exception) -> None:
ctx = _app_ctx_stack.top
if hasattr(ctx, 'sqlalchemy'):
ctx.sqlalchemy.dispose()

@property
def engine(self) -> Engine:
ctx = _app_ctx_stack.top
if ctx is not None:
if not hasattr(ctx, 'sqlalchemy'):
ctx.sqlalchemy = self.connect()
return ctx.sqlalchemy
raise RuntimeError(_app_ctx_err_msg)

Read More

OpenAPI Workflow with Flask and TypeScript

OpenAPI has become the de facto standard of designing web APIs, and there are numerous tools developed around its ecosystem. In this article, I will demonstrate the workflow of using OpenAPI in both backend and frontend projects.

OpenAPI 3.0

API Server

There are code first and design first approaches when using OpenAPI, and here we go with code first approach, i.e. writing the API server first, add specification to the method docs, then generate the final OpenAPI specification. The API server will be developed with Python Flask framework and apispec library with marshmallow extension. Let’s first install the dependencies:

1
2
3
4
5
6
7
Flask==2.1.2
Flask-Cors==3.0.10
Flask-SQLAlchemy==2.5.1
SQLAlchemy==1.4.36
python-dotenv==0.20.0
apispec[marshmallow]==5.2.2
apispec-webframeworks==0.5.2

Read More

Use Bootstrap V5 in Vue 3 Project

Bootstrap V5 and Vue 3.x have been released for a while, but the widely used BootstrapVue library is still based on Bootstrap V4 and Vue 2.x. A new version of BootstrapVue is under development, and there is an alternative project BootstrapVue 3 in alpha version. However, since Bootstrap is mainly a CSS framework, and it has dropped jQuery dependency in V5, it is not that difficult to integrate into a Vue 3.x project on your own. In this article, we will go through the steps of creating such a project.

Create Vite project

The recommended way of using Vue 3.x is with Vite. Install yarn and create from the vue-ts template:

1
2
3
4
yarn create vite bootstrap-vue3 --template vue-ts
cd bootstrap-vue3
yarn install
yarn dev

Add Bootstrap dependencies

Bootstrap is published on npm, and it has an extra dependency Popper, so let’s install them both:

1
yarn add bootstrap @popperjs/core

You may also need the type definitions:

1
yarn add -D @types/bootstrap

Use Bootstrap CSS

Just add a line to your App.vue file and you are free to use Bootstrap CSS:

1
2
3
4
5
6
7
<script setup lang="ts">
import 'bootstrap/dist/css/bootstrap.min.css'
</script>

<template>
<button type="button" class="btn btn-primary">Primary</button>
</template>

You can also use Sass for further customization.

Read More

Migrate from hexo-deployer-git to GitHub Actions

TL;DR

Create .github/workflows/pages.yml in your master branch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
name: Update gh-pages

on:
push:
branches:
- master

jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
with:
node-version: "12.22"
cache: yarn
- run: yarn install
- run: yarn build
- uses: peaceiris/actions-gh-pages@v3
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_dir: ./public

Go to GitHub repo’s Settings > Pages, change source branch to gh-pages.

How it works

Read More

Deploy Flink Job Cluster on Kubernetes

Kubernetes is the trending container orchestration system that can be used to host various applications from web services to data processing jobs. Applications are packaged in self-contained, yet light-weight containers, and we declare how they should be deployed, how they scale, and how they expose as services. Flink is also a trending distributed computing framework that can run on a variety of platforms, including Kubernetes. Combining them will bring us robust and scalable deployments of data processing jobs, and more safely Flink can share a Kubernetes cluster with other services.

Flink on Kubernetes

When deploying Flink on Kubernetes, there are two options, session cluster and job cluster. Session cluster is like running a standalone Flink cluster on k8s that can accept multiple jobs and is suitable for short running tasks or ad-hoc queries. Job cluster, on the other hand, deploys a full set of Flink cluster for each individual job. We build container image for each job, and provide it with dedicated resources, so that jobs have less chance interfering with other, and can scale out independently. So this article will illustrate how to run a Flink job cluster on Kubernetes, the steps are:

  • Compile and package the Flink job jar.
  • Build a Docker image containing the Flink runtime and the job jar.
  • Create a Kubernetes Job for Flink JobManager.
  • Create a Kubernetes Service for this Job.
  • Create a Kubernetes Deployment for Flink TaskManagers.
  • Enable Flink JobManager HA with ZooKeeper.
  • Correctly stop and resume Flink job with SavePoint facility.

Read More

Understanding Hive ACID Transactional Table

Apache Hive introduced transactions since version 0.13 to fully support ACID semantics on Hive table, including INSERT/UPDATE/DELETE/MERGE statements, streaming data ingestion, etc. In Hive 3.0, this feature is further improved by optimizing the underlying data file structure, reducing constraints on table scheme, and supporting predicate push down and vectorized query. Examples and setup can be found on Hive wiki and other tutorials, while this article will focus on how transactional table is saved on HDFS, and take a closer look at the read-write process.

File Structure

Insert Data

1
2
3
4
5
6
7
CREATE TABLE employee (id int, name string, salary int)
STORED AS ORC TBLPROPERTIES ('transactional' = 'true');

INSERT INTO employee VALUES
(1, 'Jerry', 5000),
(2, 'Tom', 8000),
(3, 'Kate', 6000);

An INSERT statement is executed in a single transaction. It will create a delta directory containing information about this transaction and its data.

1
2
3
/user/hive/warehouse/employee/delta_0000001_0000001_0000
/user/hive/warehouse/employee/delta_0000001_0000001_0000/_orc_acid_version
/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000

The schema of this folder’s name is delta_minWID_maxWID_stmtID, i.e. “delta” prefix, transactional writes’ range (minimum and maximum write ID), and statement ID. In detail:

  • All INSERT statements will create a delta directory. UPDATE statement will also create delta directory right after a delete directory. delete directory is prefixed with “delete_delta”.
  • Hive will assign a globally unique ID for every transaction, both read and write. For transactional writes like INSERT and DELETE, it will also assign a table-wise unique ID, a.k.a. a write ID. The write ID range will be encoded in the delta and delete directory names.
  • Statement ID is used when multiple writes into the same table happen in one transaction.

Read More

Real-time Exactly-once ETL with Apache Flink

Apache Flink is another popular big data processing framework, which differs from Apache Spark in that Flink uses stream processing to mimic batch processing and provides sub-second latency along with exactly-once semantics. One of its use cases is to build a real-time data pipeline, move and transform data between different stores. This article will show you how to build such an application, and explain how Flink guarantees its correctness.

Apache Flink

Demo ETL Application

Let us build a project that extracts data from Kafka and loads them into HDFS. The result files should be stored in bucketed directories according to event time. Source messages are encoded in JSON, and the event time is stored as timestamp. Samples are:

1
2
3
{"timestamp":1545184226.432,"event":"page_view","uuid":"ac0e50bf-944c-4e2f-bbf5-a34b22718e0c"}
{"timestamp":1545184602.640,"event":"adv_click","uuid":"9b220808-2193-44d1-a0e9-09b9743dec55"}
{"timestamp":1545184608.969,"event":"thumbs_up","uuid":"b44c3137-4c91-4f36-96fb-80f56561c914"}

The result directory structure should be:

1
2
/user/flink/event_log/dt=20181219/part-0-1
/user/flink/event_log/dt=20181220/part-1-9

Read More

Spark DataSource API V2

From Spark 1.3, the team introduced a data source API to help quickly integrating various input formats with Spark SQL. But eventually this version of API became insufficient and the team needed to add a lot of internal codes to provide more efficient solutions for Spark SQL data sources. So in Spark 2.3, the second version of data source API is out, which is supposed to overcome the limitations of the previous version. In this article, I will demonstrate how to implement custom data source for Spark SQL in both V1 and V2 API, to help understanding their differences and the new API’s advantages.

DataSource V1 API

V1 API provides a set of abstract classes and traits. They are located in spark/sql/sources/interfaces.scala. Some basic APIs are:

1
2
3
4
5
6
7
8
9
10
11
12
trait RelationProvider {
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

abstract class BaseRelation {
def sqlContext: SQLContext
def schema: StructType
}

trait TableScan {
def buildScan(): RDD[Row]
}

A RelationProvider defines a class that can create a relational data source for Spark SQL to manipulate with. It can initialize itself with provided options, such as file path or authentication. BaseRelation is used to define the data schema, which can be loaded from database, Parquet file, or specified by the user. This class also needs to mix-in one of the Scan traits, implements the buildScan method, and returns an RDD.

Read More

Flume Source Code: HDFS Sink

Sink is the last component of Apache Flume data flow, and it is used to output data into storages like local files, HDFS, ElasticSearch, etc. In this article, I will illustrate how Flume’s HDFS sink works, by analyzing its source code with diagrams.

Sink Component Lifecycle

In the previous article, we learnt that every Flume component implements LifecycleAware interface, and is started and monitored by LifecycleSupervisor. Sink component is not directly invoked by this supervisor, but wrapped in SinkRunner and SinkProcessor classes. Flume supports three different sink processors, to connect channel and sinks in different semantics. But here we only consider the DefaultSinkProcessor, that accepts only one sink, and we will skip the concept of sink group as well.

Sink Component LifeCycle

Read More

How to Avoid NullPointerException

NullPointerException happens when you dereference a possible null object without checking it. It’s a common exception that every Java programmer may encounter in daily work. There’re several strategies that can help us avoid this exception, making our codes more robust. In this article, I will list both traditional ways and those with tools and new features introduced by recent version of Java.

Runtime Check

The most obvious way is to use if (obj == null) to check every variable you need to use, either from function argument, return value, or instance field. When you receive a null object, you can throw a different, more informative exception like IllegalArgumentException. There are some library functions that can make this process easier, like Objects#requireNonNull:

1
2
3
4
public void testObjects(Object arg) {
Object checked = Objects.requireNonNull(arg, "arg must not be null");
checked.toString();
}

Or use Guava’s Preconditions package, which provides all kinds of arguments checking facilities:

1
2
3
4
public void testGuava(Object arg) {
Object checked = Preconditions.checkNotNull(arg, "%s must not be null", "arg");
checked.toString();
}

We can also let Lombok generate the check for us, which will throw a more meaningful NullPointerException:

1
2
3
public void testLombok(@NonNull Object arg) {
arg.toString();
}

The generated code and exception message are as follows:

1
2
3
4
5
6
public void testLombokGenerated(Object arg) {
if (arg == null) {
throw new NullPointerException("arg is marked @NonNull but is null");
}
arg.toString();
}

This annotation can also be added to a class field, and Lombok will check nullness for every assignment.

Read More