Inspection roll call votes / namentliche Abstimmungen data for "Fraktionszwang"
Get insight into "Fraktionszwang" using voting behavior data from the bundestag and abgeordnetenwatch.
TL;DR
- Measured diversity of votes by party using entropy.
- All parties have their members largely voting along party line, but not deviation is the norm. The only group voting close to radically random is those of the factionless.
Setup
Fraktionszwang should become evident by how diverse the votes are by one party across different polls.
To collect data, if not already present in ../data/preprocessed
run
uv run bundestag download huggingface
%load_ext autoreload
%autoreload 2
import polars as pl
from plotnine import (
ggplot,
aes,
geom_point,
labs,
scale_y_continuous,
facet_wrap,
theme,
geom_line,
scale_color_manual,
)
import os
from pathlib import Path
from bundestag.fine_logging import setup_logging
import logging
from bundestag.paths import get_paths
import math
from bundestag.data.transform.abgeordnetenwatch.transform import (
get_polls_parquet_path,
get_votes_parquet_path,
get_mandates_parquet_path,
)
logger = logging.getLogger(__name__)
def plot_poll_counts_over_time(df: pl.dataframe, x: str, y: str) -> ggplot:
return (
ggplot(df, aes("date", "n"))
+ geom_point()
+ labs(title="# polls per day over time", x="Date", y="# unique polls")
+ scale_y_continuous(breaks=[0, 2, 4, 6, 8, 10])
)
def plot_voting_members_per_poll_over_time(
df: pl.DataFrame, x: str, poll: str, member: str
) -> ggplot:
members_per_poll_per_day_over_time = df.group_by([x, poll]).agg(
pl.col(member).n_unique().alias("n")
)
return (
ggplot(members_per_poll_per_day_over_time, aes(x, "n"))
+ geom_point()
+ labs(
title="# Members voting per poll per day over time", x="Date", y="# members"
)
)
def compute_vote_shares(
df: pl.DataFrame, x: str, poll: str, party: str, vote: str, member: str
) -> pl.DataFrame:
member_votes_per_faction_per_poll_per_day_over_time = (
df.group_by([x, poll, party, vote])
.agg(pl.col(member).n_unique().alias("n"))
.sort(x, poll, party, vote)
)
return member_votes_per_faction_per_poll_per_day_over_time.with_columns(
(pl.col("n") / pl.sum("n").over([x, poll, party])).alias("vote share")
)
def plot_voting_shares_over_time(
df: pl.DataFrame, x: str, party: str, colors: scale_color_manual
) -> ggplot:
return (
ggplot(
df,
aes(x, "vote share", color="vote"),
)
+ geom_point(alpha=0.3)
+ labs(
title="Voting shares per poll per day over time",
x="Date",
y="Vote fraction",
)
+ facet_wrap(party, ncol=1)
+ scale_y_continuous(limits=(0, 1), breaks=[0, 0.25, 0.5, 0.75, 1.0])
+ theme(figure_size=(10, 16), subplots_adjust={"hspace": 0.35})
+ colors
)
def get_max_entropy(df: pl.DataFrame, col: str) -> float:
return -math.log2(1 / df[col].n_unique())
def compute_entropies(
df: pl.DataFrame, t: str, poll: str, party: str, vote: str
) -> pl.DataFrame:
max_entropy = get_max_entropy(df, vote)
return (
df.with_columns(**{"log p": pl.col("vote share").log(base=2)})
.group_by([t, poll, party])
.agg(
**{
"shannon entropy": -pl.when(pl.col("vote share") > 0)
.then(pl.col("vote share") * pl.col("log p"))
.otherwise(0)
.sum()
}
)
.with_columns(
**{
"share of max shannon entropy [%]": pl.col("shannon entropy")
/ max_entropy
}
)
)
def compute_rolling_median(
df: pl.DataFrame,
n: int,
x: str,
party: str,
y: str = "share of max shannon entropy [%]",
) -> pl.DataFrame:
return df.sort(x).with_columns(
pl.col(y).rolling_median(window_size=n).over(party).alias("rolling_median")
)
def plot_entropy_over_time(
df: pl.DataFrame,
party_colors: scale_color_manual,
x: str = "date",
y: str = "share of max shannon entropy [%]",
color: str = "party",
) -> ggplot:
return (
ggplot(df, aes(x, y, color=color))
+ geom_point(alpha=0.3)
+ labs(
title="Voting entropy per poll per day over time",
x="Date",
y="Shannon entropy (smaller = more Fraktionszwang)",
)
+ facet_wrap(color, ncol=1)
+ theme(figure_size=(10, 16), subplots_adjust={"hspace": 0.35})
+ party_colors
+ scale_y_continuous(labels=lambda v: [f"{x * 100:.0f}%" for x in v])
)
def plot_rolling_entropy_over_time(
df: pl.DataFrame,
party_colors: scale_color_manual,
n_polls_to_average: int,
x: str = "date",
y: str = "rolling_median",
color: str = "party",
) -> ggplot:
return (
ggplot(df, aes(x=x, color=color))
+ geom_line(aes(y=y))
+ labs(
title=f"Relative voting entropy per poll per day over time with rolling median (n={n_polls_to_average})",
x="Date",
y="Share of Shannon entropy relative to maximum (smaller = more Fraktionszwang)",
)
+ theme(figure_size=(8, 6), subplots_adjust={"hspace": 0.35})
+ party_colors
+ scale_y_continuous(
labels=lambda v: [f"{x * 100:.0f}%" for x in v], limits=(0, 1)
)
)
# if this notebook is run via `make docs` then the environment variable is set
makedocs = os.getenv("MAKEDOCS") is not None
logger.info(f"Running nb with {makedocs=}")
setup_logging(logging.INFO)
_fig_path = Path("./images")
paths = get_paths("../data")
paths
Bundestag sheet data
Collect data
file = paths.preprocessed_bundestag / "bundestag.de_votes.parquet"
file
data_bundestag = pl.read_parquet(file)
data_bundestag.head()
Clean the data
Use a single name for "Die Linke"
data_bundestag["Fraktion/Gruppe"].value_counts()
data_bundestag = data_bundestag.with_columns(
**{
"Fraktion/Gruppe": pl.when(pl.col("Fraktion/Gruppe").eq(pl.lit("DIE LINKE.")))
.then(pl.lit("Die Linke"))
.otherwise(pl.col("Fraktion/Gruppe"))
}
)
data_bundestag["Fraktion/Gruppe"].value_counts()
Plot metrics over time
How many things are voted on per day over time?
things_per_day_over_time = data_bundestag.group_by("date").agg(
pl.col("Abstimmnr").n_unique().alias("n")
)
things_per_day_over_time.head()
p = plot_poll_counts_over_time(things_per_day_over_time, "date", "n")
p.show()
if makedocs:
p.save(_fig_path / "bundestag_sheets_polls_over_time.png")
How many members vote per poll over time?
p = plot_voting_members_per_poll_over_time(
data_bundestag, "date", "Abstimmnr", "Bezeichnung"
)
p.show()
if makedocs:
p.save(_fig_path / "bundestag_sheets_members_per_poll_over_time.png")
Count of vote types by date, poll and party over time.
member_votes_per_faction_per_poll_per_day_over_time = compute_vote_shares(
data_bundestag, "date", "Abstimmnr", "Fraktion/Gruppe", "vote", "Bezeichnung"
)
member_votes_per_faction_per_poll_per_day_over_time.head()
colors = scale_color_manual(
breaks=["ja", "nein", "nichtabgegeben", "Enthaltung"],
values=["green", "red", "grey", "orange"],
)
p = plot_voting_shares_over_time(
member_votes_per_faction_per_poll_per_day_over_time,
"date",
"Fraktion/Gruppe",
colors,
)
p.show()
if makedocs:
p.save(_fig_path / "bundestag_sheets_voting_shares_over_time.png")
entropy_per_poll_faction = compute_entropies(
member_votes_per_faction_per_poll_per_day_over_time,
"date",
"Abstimmnr",
"Fraktion/Gruppe",
"vote",
)
entropy_per_poll_faction.head(2)
party_colors = scale_color_manual(
breaks=[
"AfD",
"BSW",
"BÜ90/GR",
"CDU/CSU",
"Die Linke",
"FDP",
"Fraktionslos",
"SPD",
],
values=["blue", "purple", "green", "black", "red", "yellow", "grey", "salmon"],
)
p = plot_entropy_over_time(
entropy_per_poll_faction, party_colors, x="date", color="Fraktion/Gruppe"
)
p.show()
if makedocs:
p.save(_fig_path / "bundestag_sheets_voting_entropy_over_time.png")
Now we compute the rolling median of shannon entropy
over n_polls_to_average
polls for each Fraktion/Gruppe
.
n_polls_to_average = 30
entropy_per_poll_faction = compute_rolling_median(
entropy_per_poll_faction, n_polls_to_average, "date", "Fraktion/Gruppe"
)
entropy_per_poll_faction.head()
Now let's plot the original shannon entropy
and the shannon_entropy_rolling_median
to see the effect of the rolling median.
p = plot_rolling_entropy_over_time(
entropy_per_poll_faction,
party_colors,
n_polls_to_average,
"date",
color="Fraktion/Gruppe",
)
p.show()
if makedocs:
p.save(_fig_path / "bundestag_sheets_rolling_voting_entropy_over_time.png")
Abgeordnetenwatch.de data
Collect data
First, collect data for the individual legislative periods
legislature_ids = [67, 83, 97, 111, 132, 161]
tmp = []
for legislature_id in legislature_ids:
p = get_polls_parquet_path(legislature_id, paths.preprocessed_abgeordnetenwatch)
if not p.exists():
continue
_mandates = pl.read_parquet(p)
_mandates = _mandates.with_columns(**{"legislature_id": legislature_id})
tmp.append(_mandates)
polls = pl.concat(tmp, how="diagonal_relaxed")
polls.head(2), polls.tail(2)
tmp = []
for legislature_id in legislature_ids:
p = get_votes_parquet_path(legislature_id, paths.preprocessed_abgeordnetenwatch)
if not p.exists():
continue
_mandates = pl.read_parquet(p)
_mandates = _mandates.with_columns(**{"legislature_id": legislature_id})
tmp.append(_mandates)
votes = pl.concat(tmp, how="diagonal_relaxed")
votes.head(2), votes.tail(2)
tmp = []
for legislature_id in legislature_ids:
p = get_mandates_parquet_path(legislature_id, paths.preprocessed_abgeordnetenwatch)
if not p.exists():
continue
_mandates = pl.read_parquet(p)
print(len(_mandates))
_mandates = _mandates.with_columns(**{"legislature_id": legislature_id})
tmp.append(_mandates)
mandates = pl.concat(tmp, how="diagonal_relaxed")
mandates.head(2), mandates.tail(2)
data_abgeordnetenwatch = polls.join(
votes, on=["legislature_id", "poll_id"], how="left"
).join(mandates, on=["legislature_id", "mandate_id"], how="left")
data_abgeordnetenwatch = data_abgeordnetenwatch.with_columns(
**{"date": pl.col("poll_date").str.to_date(format="%Y-%m-%d")}
)
with pl.Config(tbl_rows=15):
display(data_abgeordnetenwatch["party"].value_counts(sort=True))
Clean the data
data_abgeordnetenwatch = data_abgeordnetenwatch.with_columns(
**{
"party": pl.when(
pl.col("party").is_in(pl.lit(["DIE LINKE", "Die Linke. (Gruppe)"]))
)
.then(pl.lit("Die Linke"))
.otherwise(pl.col("party"))
}
).with_columns(
**{
"party": pl.when(pl.col("party").is_in(pl.lit(["DIE GRÜNEN"])))
.then(pl.lit("BÜNDNIS 90/DIE GRÜNEN"))
.otherwise(pl.col("party"))
}
)
with pl.Config(tbl_rows=15):
display(data_abgeordnetenwatch["party"].value_counts(sort=True))
data_abgeordnetenwatch.filter(pl.col("party").is_null()).group_by("legislature_id").agg(
**{"mandates": pl.col("mandate_id").n_unique()}
)
Plot metrics over time
things_per_day_over_time = data_abgeordnetenwatch.group_by("date").agg(
pl.col("poll_id").n_unique().alias("n")
)
things_per_day_over_time.head()
p = plot_poll_counts_over_time(things_per_day_over_time, "date", "n")
p.show()
if makedocs:
p.save(_fig_path / "abgeordnetenwatch_polls_over_time.png")
p = plot_voting_members_per_poll_over_time(
data_abgeordnetenwatch, "date", "poll_id", "mandate_id"
)
p.show()
if makedocs:
p.save(_fig_path / "abgeordnetenwatch_members_per_poll_over_time.png")
member_votes_per_faction_per_poll_per_day_over_time = compute_vote_shares(
data_abgeordnetenwatch, "date", "poll_id", "party", "vote", "mandate_id"
)
colors = scale_color_manual(
breaks=["yes", "no", "no_show", "abstain"],
values=["green", "red", "grey", "orange"],
)
p = plot_voting_shares_over_time(
member_votes_per_faction_per_poll_per_day_over_time, "date", "party", colors
)
p.show()
if makedocs:
p.save(_fig_path / "abgeordnetenwatch_voting_shares_over_time.png")
entropy_per_poll_faction = compute_entropies(
member_votes_per_faction_per_poll_per_day_over_time,
"date",
"poll_id",
"party",
"vote",
)
entropy_per_poll_faction.head()
party_colors = scale_color_manual(
breaks=[
"AfD",
"BSW (Gruppe)",
"BÜNDNIS 90/DIE GRÜNEN",
"CDU/CSU",
"Die Linke",
"FDP",
"fraktionslos",
"SPD",
],
values=["blue", "purple", "green", "black", "red", "yellow", "grey", "salmon"],
)
p = plot_entropy_over_time(entropy_per_poll_faction, party_colors)
p.show()
if makedocs:
p.save(_fig_path / "abgeordnetenwatch_voting_entropy_over_time.png")
n_polls_to_average = 30
entropy_per_poll_faction = compute_rolling_median(
entropy_per_poll_faction,
n_polls_to_average,
"date",
"party",
y="share of max shannon entropy [%]",
)
p = plot_rolling_entropy_over_time(
entropy_per_poll_faction.filter(pl.col("party").is_not_null()),
party_colors,
n_polls_to_average,
)
p.show()
if makedocs:
p.save(_fig_path / "abgeordnetenwatch_rolling_voting_entropy_over_time.png")